This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new fa88333039f Kafka-14743: update request metrics after callback (#13297)
fa88333039f is described below

commit fa88333039fee0d26778c78c7b010e1b751d4c96
Author: Luke Chen <[email protected]>
AuthorDate: Sun Feb 26 15:16:51 2023 +0800

    Kafka-14743: update request metrics after callback (#13297)
    
    Currently, the 
kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request=Fetch 
will not get updated because the request metrics is recorded BEFORE the 
messageConversions metrics value updated. That means, even if we updated the 
messageConversions metrics value, the request metrics will never reflect the 
update. This patch fixes it by updating the request metric after callback 
completed, so that the messageConversions metric value can be updated correctly.
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Divij Vaidya 
<[email protected]>
---
 .../main/scala/kafka/network/SocketServer.scala    |  7 ++++---
 .../FetchRequestDownConversionConfigTest.scala     | 24 +++++++++++++++++++++-
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 12 ++++++++++-
 3 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index e91c240415c..0c08d7b056a 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -1130,10 +1130,11 @@ private[kafka] class Processor(
         val response = inflightResponses.remove(send.destinationId).getOrElse {
           throw new IllegalStateException(s"Send for ${send.destinationId} 
completed, but not in `inflightResponses`")
         }
-        updateRequestMetrics(response)
-
-        // Invoke send completion callback
+        
+        // Invoke send completion callback, and then update request metrics 
since there might be some
+        // request metrics got updated during callback
         response.onComplete.foreach(onComplete => onComplete(send))
+        updateRequestMetrics(response)
 
         // Try unmuting the channel. If there was no quota violation and the 
channel has not been throttled,
         // it will be unmuted immediately. If the channel has been throttled, 
it will unmuted only if the throttling
diff --git 
a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
 
b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
index 56a3485da40..0a86384a3d3 100644
--- 
a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -18,8 +18,8 @@ package kafka.server
 
 import java.util
 import java.util.{Optional, Properties}
-
 import kafka.log.LogConfig
+import kafka.network.RequestMetrics.{MessageConversionsTimeMs, 
TemporaryMemoryBytes}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.message.FetchResponseData
@@ -164,6 +164,12 @@ class FetchRequestDownConversionConfigTest extends 
BaseRequestTest {
   }
 
   def testV1Fetch(isFollowerFetch: Boolean): Unit = {
+    val fetchRequest = "request=Fetch"
+    val fetchTemporaryMemoryBytesMetricName = 
s"$TemporaryMemoryBytes,$fetchRequest"
+    val fetchMessageConversionsTimeMsMetricName = 
s"$MessageConversionsTimeMs,$fetchRequest"
+    val initialFetchMessageConversionsPerSec = 
TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)
+    val initialFetchMessageConversionsTimeMs = 
TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName)
+    val initialFetchTemporaryMemoryBytes = 
TestUtils.metersCount(fetchTemporaryMemoryBytesMetricName)
     val topicWithDownConversionEnabled = "foo"
     val topicWithDownConversionDisabled = "bar"
     val replicaIds = brokers.map(_.config.brokerId)
@@ -216,12 +222,28 @@ class FetchRequestDownConversionConfigTest extends 
BaseRequestTest {
       Errors.forCode(fetchResponseData.get(tp).errorCode)
     }
 
+    def verifyMetrics(): Unit = {
+      TestUtils.waitUntilTrue(() => 
TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec) > 
initialFetchMessageConversionsPerSec,
+        s"The `FetchMessageConversionsPerSec` metric count is not incremented 
after 5 seconds. " +
+          s"init: $initialFetchMessageConversionsPerSec final: 
${TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)}", 5000)
+
+      TestUtils.waitUntilTrue(() => 
TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName) > 
initialFetchMessageConversionsTimeMs,
+        s"The `MessageConversionsTimeMs` in fetch request metric count is not 
incremented after 5 seconds. " +
+          s"init: $initialFetchMessageConversionsTimeMs final: 
${TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName)}", 5000)
+
+      TestUtils.waitUntilTrue(() => 
TestUtils.metersCount(fetchTemporaryMemoryBytesMetricName) > 
initialFetchTemporaryMemoryBytes,
+        s"The `TemporaryMemoryBytes` in fetch request metric count is not 
incremented after 5 seconds. " +
+          s"init: $initialFetchTemporaryMemoryBytes final: 
${TestUtils.metersCount(fetchTemporaryMemoryBytesMetricName)}", 5000)
+    }
+
     assertEquals(Errors.NONE, error(partitionWithDownConversionEnabled))
     if (isFollowerFetch) {
       assertEquals(Errors.NONE, error(partitionWithDownConversionDisabled))
     } else {
       assertEquals(Errors.UNSUPPORTED_VERSION, 
error(partitionWithDownConversionDisabled))
     }
+
+    verifyMetrics()
   }
 
   private def sendFetch(
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index e21214b6f7e..525ff734f51 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -28,7 +28,7 @@ import java.util
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{Callable, CompletableFuture, ExecutionException, 
Executors, TimeUnit}
 import java.util.{Arrays, Collections, Optional, Properties}
-import com.yammer.metrics.core.{Gauge, Meter}
+import com.yammer.metrics.core.{Gauge, Histogram, Meter}
 
 import javax.net.ssl.X509TrustManager
 import kafka.api._
@@ -2089,6 +2089,16 @@ object TestUtils extends Logging {
       .count
   }
 
+  def metersCount(metricName: String): Long = {
+    KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+      .filter { case (k, _) => k.getMBeanName.endsWith(metricName) }
+      .values.map {
+        case histogram: Histogram => histogram.count()
+        case meter: Meter => meter.count()
+        case _ => 0
+      }.sum
+  }
+
   def clearYammerMetrics(): Unit = {
     for (metricName <- 
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala)
       KafkaYammerMetrics.defaultRegistry.removeMetric(metricName)

Reply via email to