This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e026384ffb HOTFIX: Only measure in nano when producer metadata refresh
is required (#12102)
e026384ffb is described below
commit e026384ffb3170a2e71053a4163db58b9bd8fba6
Author: Guozhang Wang <[email protected]>
AuthorDate: Wed Apr 27 11:27:54 2022 -0700
HOTFIX: Only measure in nano when producer metadata refresh is required
(#12102)
We added the metadata wait time in total blocked time (#11805). But we
added it in the critical path of send which is called per-record, whereas
metadata refresh only happens rarely. This way the cost of time.nanos becomes
unnecessarily significant as we call it twice per record.
This PR moves the call to inside the waitOnMetadata callee and only when we
do need to wait for a metadata refresh round-trip (i.e. we are indeed blocking).
Reviewers: Matthias J. Sax <[email protected]>
---
.../main/java/org/apache/kafka/clients/producer/KafkaProducer.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index f36db02a9d..85a3e239e9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -931,7 +931,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
long nowMs = time.milliseconds();
- long nowNanos = time.nanoseconds();
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(),
record.partition(), nowMs, maxBlockTimeMs);
@@ -941,7 +940,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
throw e;
}
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
- producerMetrics.recordMetadataWait(time.nanoseconds() - nowNanos);
long remainingWaitMs = Math.max(0, maxBlockTimeMs -
clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
@@ -1080,6 +1078,7 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
// Issue metadata requests until we have metadata for the topic and
the requested partition,
// or until maxWaitTimeMs is exceeded. This is necessary in case the
metadata
// is stale and the number of partitions for this topic has increased
in the meantime.
+ long nowNanos = time.nanoseconds();
do {
if (partition != null) {
log.trace("Requesting metadata update for partition {} of
topic {}.", partition, topic);
@@ -1111,6 +1110,8 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null || (partition != null && partition >=
partitionsCount));
+ producerMetrics.recordMetadataWait(time.nanoseconds() - nowNanos);
+
return new ClusterAndWaitTime(cluster, elapsed);
}