This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e026384ffb HOTFIX: Only measure in nano when producer metadata refresh 
is required (#12102)
e026384ffb is described below

commit e026384ffb3170a2e71053a4163db58b9bd8fba6
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Wed Apr 27 11:27:54 2022 -0700

    HOTFIX: Only measure in nano when producer metadata refresh is required 
(#12102)
    
    We added the metadata wait time in total blocked time (#11805). But we 
added it in the critical path of send which is called per-record, whereas 
metadata refresh only happens rarely. This way the cost of time.nanos becomes 
unnecessarily significant as we call it twice per record.
    
    This PR moves the call to inside the waitOnMetadata callee and only when we 
do need to wait for a metadata refresh round-trip (i.e. we are indeed blocking).
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>
---
 .../main/java/org/apache/kafka/clients/producer/KafkaProducer.java   | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index f36db02a9d..85a3e239e9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -931,7 +931,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             throwIfProducerClosed();
             // first make sure the metadata for the topic is available
             long nowMs = time.milliseconds();
-            long nowNanos = time.nanoseconds();
             ClusterAndWaitTime clusterAndWaitTime;
             try {
                 clusterAndWaitTime = waitOnMetadata(record.topic(), 
record.partition(), nowMs, maxBlockTimeMs);
@@ -941,7 +940,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 throw e;
             }
             nowMs += clusterAndWaitTime.waitedOnMetadataMs;
-            producerMetrics.recordMetadataWait(time.nanoseconds() - nowNanos);
             long remainingWaitMs = Math.max(0, maxBlockTimeMs - 
clusterAndWaitTime.waitedOnMetadataMs);
             Cluster cluster = clusterAndWaitTime.cluster;
             byte[] serializedKey;
@@ -1080,6 +1078,7 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
         // Issue metadata requests until we have metadata for the topic and 
the requested partition,
         // or until maxWaitTimeMs is exceeded. This is necessary in case the 
metadata
         // is stale and the number of partitions for this topic has increased 
in the meantime.
+        long nowNanos = time.nanoseconds();
         do {
             if (partition != null) {
                 log.trace("Requesting metadata update for partition {} of 
topic {}.", partition, topic);
@@ -1111,6 +1110,8 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
             partitionsCount = cluster.partitionCountForTopic(topic);
         } while (partitionsCount == null || (partition != null && partition >= 
partitionsCount));
 
+        producerMetrics.recordMetadataWait(time.nanoseconds() - nowNanos);
+
         return new ClusterAndWaitTime(cluster, elapsed);
     }
 

Reply via email to