This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new f4b46a7cec5 KAFKA-17731: Removed timed waiting signal for client 
telemetry close (#17431)
f4b46a7cec5 is described below

commit f4b46a7cec533eac4d4433953a722454d947a98a
Author: Apoorv Mittal <[email protected]>
AuthorDate: Thu Oct 10 12:19:05 2024 +0100

    KAFKA-17731: Removed timed waiting signal for client telemetry close 
(#17431)
    
    Reviewers: Andrew Schofield <[email protected]>, Kirk True 
<[email protected]>, Chia-Ping Tsai <[email protected]>, Lianet Magrans 
<[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  2 +-
 .../consumer/internals/LegacyKafkaConsumer.java    |  2 +-
 .../kafka/clients/producer/KafkaProducer.java      |  4 ++-
 .../internals/ClientTelemetryReporter.java         | 24 ++++---------
 .../telemetry/internals/ClientTelemetrySender.java |  4 +--
 .../internals/ClientTelemetryReporterTest.java     | 42 ++++++++++++++++++++++
 6 files changed, 54 insertions(+), 24 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index ad3e9131f13..a671c2a7508 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1219,7 +1219,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         AtomicReference<Throwable> firstException = new AtomicReference<>();
 
         final Timer closeTimer = time.timer(timeout);
-        clientTelemetryReporter.ifPresent(reporter -> 
reporter.initiateClose(timeout.toMillis()));
+        
clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
         closeTimer.update();
         // Prepare shutting down the network thread
         prepareShutdown(closeTimer, firstException);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java
index bcc8cb40f2d..d1a17df4110 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java
@@ -1124,7 +1124,7 @@ public class LegacyKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         AtomicReference<Throwable> firstException = new AtomicReference<>();
 
         final Timer closeTimer = createTimerForRequest(timeout);
-        clientTelemetryReporter.ifPresent(reporter -> 
reporter.initiateClose(timeout.toMillis()));
+        
clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
         closeTimer.update();
         // Close objects with a timeout. The timeout is required because the 
coordinator & the fetcher send requests to
         // the server in the process of closing which may not respect the 
overall timeout defined for closing the
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 94273358e7d..33e743703fe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1356,6 +1356,9 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
             } else {
                 // Try to close gracefully.
                 final Timer closeTimer = time.timer(timeout);
+                
clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
+                closeTimer.update();
+
                 if (this.sender != null) {
                     this.sender.initiateClose();
                     closeTimer.update();
@@ -1370,7 +1373,6 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
                         closeTimer.update();
                     }
                 }
-                clientTelemetryReporter.ifPresent(reporter -> 
reporter.initiateClose(closeTimer.remainingMs()));
             }
         }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
index 722280e9cbc..ed6f27f4f0b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
@@ -218,10 +218,10 @@ public class ClientTelemetryReporter implements 
MetricsReporter {
         telemetryProvider.updateLabels(labels);
     }
 
-    public void initiateClose(long timeoutMs) {
+    public void initiateClose() {
         log.debug("Initiate close of ClientTelemetryReporter");
         try {
-            clientTelemetrySender.initiateClose(timeoutMs);
+            clientTelemetrySender.initiateClose();
         } catch (Exception exception) {
             log.error("Failed to initiate close of client telemetry reporter", 
exception);
         }
@@ -271,7 +271,6 @@ public class ClientTelemetryReporter implements 
MetricsReporter {
 
         private final ReadWriteLock lock = new ReentrantReadWriteLock();
         private final Condition subscriptionLoaded = 
lock.writeLock().newCondition();
-        private final Condition terminalPushInProgress = 
lock.writeLock().newCondition();
         /*
          Initial state should be subscription needed which should allow 
issuing first telemetry
          request of get telemetry subscription.
@@ -599,8 +598,8 @@ public class ClientTelemetryReporter implements 
MetricsReporter {
         }
 
         @Override
-        public void initiateClose(long timeoutMs) {
-            log.debug("initiate close for client telemetry, check if terminal 
push required. Timeout {} ms.", timeoutMs);
+        public void initiateClose() {
+            log.debug("initiate close for client telemetry, check if terminal 
push required.");
 
             lock.writeLock().lock();
             try {
@@ -620,14 +619,7 @@ public class ClientTelemetryReporter implements 
MetricsReporter {
                     return;
                 }
 
-                try {
-                    log.info("About to wait {} ms. for terminal telemetry push 
to be submitted", timeoutMs);
-                    if (!terminalPushInProgress.await(timeoutMs, 
TimeUnit.MILLISECONDS)) {
-                        log.info("Wait for terminal telemetry push to be 
submitted has elapsed; may not have actually sent request");
-                    }
-                } catch (InterruptedException e) {
-                    log.warn("Error during client telemetry close", e);
-                }
+                log.debug("Updated state to send terminal telemetry push 
request");
             } finally {
                 lock.writeLock().unlock();
             }
@@ -813,13 +805,9 @@ public class ClientTelemetryReporter implements 
MetricsReporter {
                 ClientTelemetryState oldState = state;
                 state = oldState.validateTransition(newState);
                 log.debug("Setting telemetry state from {} to {}", oldState, 
newState);
-
-                if (newState == 
ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS) {
-                    terminalPushInProgress.signalAll();
-                }
                 return true;
             } catch (IllegalStateException e) {
-                log.warn("Error updating client telemetry state, disabled 
telemetry", e);
+                log.warn("Error updating client telemetry state, disabled 
telemetry");
                 enabled = false;
                 return false;
             } finally {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java
 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java
index e6bb9d40e73..99a50ecebd9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java
@@ -107,8 +107,6 @@ public interface ClientTelemetrySender extends 
AutoCloseable {
     /**
      * Initiates shutdown of this client. This method is called when the 
enclosing client instance
      * is being closed. This method should not throw an exception if the 
client is already closed.
-     *
-     * @param timeoutMs The maximum time to wait for the client to close.
      */
-    void initiateClose(long timeoutMs);
+    void initiateClose();
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
 
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
index 2eed8d51195..c2ea26ecf37 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
@@ -646,6 +646,48 @@ public class ClientTelemetryReporterTest {
         assertTrue(timeMs >= 500 && timeMs <= 1500);
     }
 
+    @Test
+    public void testTelemetryReporterInitiateClose() {
+        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = 
(ClientTelemetryReporter.DefaultClientTelemetrySender) 
clientTelemetryReporter.telemetrySender();
+        telemetrySender.updateSubscriptionResult(subscription, 
time.milliseconds());
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
+        clientTelemetryReporter.initiateClose();
+        assertEquals(ClientTelemetryState.TERMINATING_PUSH_NEEDED, 
((ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter
+            .telemetrySender()).state());
+    }
+
+    @Test
+    public void testTelemetryReporterInitiateCloseNoSubscription() {
+        clientTelemetryReporter.initiateClose();
+        assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, 
((ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter
+            .telemetrySender()).state());
+    }
+
+    @Test
+    public void testTelemetryReporterInitiateCloseAlreadyInTerminatedStates() {
+        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = 
(ClientTelemetryReporter.DefaultClientTelemetrySender) 
clientTelemetryReporter.telemetrySender();
+        telemetrySender.updateSubscriptionResult(subscription, 
time.milliseconds());
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED));
+
+        clientTelemetryReporter.initiateClose();
+        assertEquals(ClientTelemetryState.TERMINATING_PUSH_NEEDED, 
((ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter
+            .telemetrySender()).state());
+
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS));
+        clientTelemetryReporter.initiateClose();
+        assertEquals(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS, 
((ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter
+            .telemetrySender()).state());
+
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATED));
+        clientTelemetryReporter.initiateClose();
+        assertEquals(ClientTelemetryState.TERMINATED, 
((ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter
+            .telemetrySender()).state());
+    }
+
     @AfterEach
     public void tearDown() {
         clientTelemetryReporter.close();

Reply via email to