This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push:
new f4b46a7cec5 KAFKA-17731: Removed timed waiting signal for client
telemetry close (#17431)
f4b46a7cec5 is described below
commit f4b46a7cec533eac4d4433953a722454d947a98a
Author: Apoorv Mittal <[email protected]>
AuthorDate: Thu Oct 10 12:19:05 2024 +0100
KAFKA-17731: Removed timed waiting signal for client telemetry close
(#17431)
Reviewers: Andrew Schofield <[email protected]>, Kirk True
<[email protected]>, Chia-Ping Tsai <[email protected]>, Lianet Magrans
<[email protected]>
---
.../consumer/internals/AsyncKafkaConsumer.java | 2 +-
.../consumer/internals/LegacyKafkaConsumer.java | 2 +-
.../kafka/clients/producer/KafkaProducer.java | 4 ++-
.../internals/ClientTelemetryReporter.java | 24 ++++---------
.../telemetry/internals/ClientTelemetrySender.java | 4 +--
.../internals/ClientTelemetryReporterTest.java | 42 ++++++++++++++++++++++
6 files changed, 54 insertions(+), 24 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index ad3e9131f13..a671c2a7508 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1219,7 +1219,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
AtomicReference<Throwable> firstException = new AtomicReference<>();
final Timer closeTimer = time.timer(timeout);
- clientTelemetryReporter.ifPresent(reporter ->
reporter.initiateClose(timeout.toMillis()));
+
clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
closeTimer.update();
// Prepare shutting down the network thread
prepareShutdown(closeTimer, firstException);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java
index bcc8cb40f2d..d1a17df4110 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java
@@ -1124,7 +1124,7 @@ public class LegacyKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
AtomicReference<Throwable> firstException = new AtomicReference<>();
final Timer closeTimer = createTimerForRequest(timeout);
- clientTelemetryReporter.ifPresent(reporter ->
reporter.initiateClose(timeout.toMillis()));
+
clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
closeTimer.update();
// Close objects with a timeout. The timeout is required because the
coordinator & the fetcher send requests to
// the server in the process of closing which may not respect the
overall timeout defined for closing the
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 94273358e7d..33e743703fe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1356,6 +1356,9 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
} else {
// Try to close gracefully.
final Timer closeTimer = time.timer(timeout);
+
clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
+ closeTimer.update();
+
if (this.sender != null) {
this.sender.initiateClose();
closeTimer.update();
@@ -1370,7 +1373,6 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
closeTimer.update();
}
}
- clientTelemetryReporter.ifPresent(reporter ->
reporter.initiateClose(closeTimer.remainingMs()));
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
index 722280e9cbc..ed6f27f4f0b 100644
---
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
+++
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
@@ -218,10 +218,10 @@ public class ClientTelemetryReporter implements
MetricsReporter {
telemetryProvider.updateLabels(labels);
}
- public void initiateClose(long timeoutMs) {
+ public void initiateClose() {
log.debug("Initiate close of ClientTelemetryReporter");
try {
- clientTelemetrySender.initiateClose(timeoutMs);
+ clientTelemetrySender.initiateClose();
} catch (Exception exception) {
log.error("Failed to initiate close of client telemetry reporter",
exception);
}
@@ -271,7 +271,6 @@ public class ClientTelemetryReporter implements
MetricsReporter {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Condition subscriptionLoaded =
lock.writeLock().newCondition();
- private final Condition terminalPushInProgress =
lock.writeLock().newCondition();
/*
Initial state should be subscription needed which should allow
issuing first telemetry
request of get telemetry subscription.
@@ -599,8 +598,8 @@ public class ClientTelemetryReporter implements
MetricsReporter {
}
@Override
- public void initiateClose(long timeoutMs) {
- log.debug("initiate close for client telemetry, check if terminal
push required. Timeout {} ms.", timeoutMs);
+ public void initiateClose() {
+ log.debug("initiate close for client telemetry, check if terminal
push required.");
lock.writeLock().lock();
try {
@@ -620,14 +619,7 @@ public class ClientTelemetryReporter implements
MetricsReporter {
return;
}
- try {
- log.info("About to wait {} ms. for terminal telemetry push
to be submitted", timeoutMs);
- if (!terminalPushInProgress.await(timeoutMs,
TimeUnit.MILLISECONDS)) {
- log.info("Wait for terminal telemetry push to be
submitted has elapsed; may not have actually sent request");
- }
- } catch (InterruptedException e) {
- log.warn("Error during client telemetry close", e);
- }
+ log.debug("Updated state to send terminal telemetry push
request");
} finally {
lock.writeLock().unlock();
}
@@ -813,13 +805,9 @@ public class ClientTelemetryReporter implements
MetricsReporter {
ClientTelemetryState oldState = state;
state = oldState.validateTransition(newState);
log.debug("Setting telemetry state from {} to {}", oldState,
newState);
-
- if (newState ==
ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS) {
- terminalPushInProgress.signalAll();
- }
return true;
} catch (IllegalStateException e) {
- log.warn("Error updating client telemetry state, disabled
telemetry", e);
+ log.warn("Error updating client telemetry state, disabled
telemetry");
enabled = false;
return false;
} finally {
diff --git
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java
index e6bb9d40e73..99a50ecebd9 100644
---
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java
+++
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java
@@ -107,8 +107,6 @@ public interface ClientTelemetrySender extends
AutoCloseable {
/**
* Initiates shutdown of this client. This method is called when the
enclosing client instance
* is being closed. This method should not throw an exception if the
client is already closed.
- *
- * @param timeoutMs The maximum time to wait for the client to close.
*/
- void initiateClose(long timeoutMs);
+ void initiateClose();
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
index 2eed8d51195..c2ea26ecf37 100644
---
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
@@ -646,6 +646,48 @@ public class ClientTelemetryReporterTest {
assertTrue(timeMs >= 500 && timeMs <= 1500);
}
+ @Test
+ public void testTelemetryReporterInitiateClose() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender =
(ClientTelemetryReporter.DefaultClientTelemetrySender)
clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription,
time.milliseconds());
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
+ clientTelemetryReporter.initiateClose();
+ assertEquals(ClientTelemetryState.TERMINATING_PUSH_NEEDED,
((ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter
+ .telemetrySender()).state());
+ }
+
+ @Test
+ public void testTelemetryReporterInitiateCloseNoSubscription() {
+ clientTelemetryReporter.initiateClose();
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED,
((ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter
+ .telemetrySender()).state());
+ }
+
+ @Test
+ public void testTelemetryReporterInitiateCloseAlreadyInTerminatedStates() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender =
(ClientTelemetryReporter.DefaultClientTelemetrySender)
clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription,
time.milliseconds());
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED));
+
+ clientTelemetryReporter.initiateClose();
+ assertEquals(ClientTelemetryState.TERMINATING_PUSH_NEEDED,
((ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter
+ .telemetrySender()).state());
+
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS));
+ clientTelemetryReporter.initiateClose();
+ assertEquals(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS,
((ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter
+ .telemetrySender()).state());
+
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATED));
+ clientTelemetryReporter.initiateClose();
+ assertEquals(ClientTelemetryState.TERMINATED,
((ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter
+ .telemetrySender()).state());
+ }
+
@AfterEach
public void tearDown() {
clientTelemetryReporter.close();