This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ae672574065 Kafka-14743: update request metrics after callback (#13297)
ae672574065 is described below

commit ae6725740651cc76280840af3a657f9bb9522e37
Author: Luke Chen <[email protected]>
AuthorDate: Sun Feb 26 15:16:51 2023 +0800

    Kafka-14743: update request metrics after callback (#13297)
    
    Currently, the 
kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request=Fetch 
will not get updated because the request metrics is recorded BEFORE the 
messageConversions metrics value updated. That means, even if we updated the 
messageConversions metrics value, the request metrics will never reflect the 
update. This patch fixes it by updating the request metric after callback 
completed, so that the messageConversions metric value can be updated correctly.
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Divij Vaidya 
<[email protected]>
---
 .../main/scala/kafka/network/SocketServer.scala    |  7 +++---
 .../FetchRequestDownConversionConfigTest.scala     | 25 +++++++++++++++++++---
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 10 ++++++---
 3 files changed, 33 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index dca6c108374..6807a2a82fc 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -1175,10 +1175,11 @@ private[kafka] class Processor(
         val response = inflightResponses.remove(send.destinationId).getOrElse {
           throw new IllegalStateException(s"Send for ${send.destinationId} 
completed, but not in `inflightResponses`")
         }
-        updateRequestMetrics(response)
-
-        // Invoke send completion callback
+        
+        // Invoke send completion callback, and then update request metrics 
since there might be some
+        // request metrics got updated during callback
         response.onComplete.foreach(onComplete => onComplete(send))
+        updateRequestMetrics(response)
 
         // Try unmuting the channel. If there was no quota violation and the 
channel has not been throttled,
         // it will be unmuted immediately. If the channel has been throttled, 
it will unmuted only if the throttling
diff --git 
a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
 
b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
index 3f8ba8e099a..d01228c1d13 100644
--- 
a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -18,6 +18,7 @@ package kafka.server
 
 import java.util
 import java.util.{Optional, Properties}
+import kafka.network.RequestMetrics.{MessageConversionsTimeMs, 
TemporaryMemoryBytes}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.config.TopicConfig
@@ -163,7 +164,12 @@ class FetchRequestDownConversionConfigTest extends 
BaseRequestTest {
   }
 
   def testV1Fetch(isFollowerFetch: Boolean): Unit = {
+    val fetchRequest = "request=Fetch"
+    val fetchTemporaryMemoryBytesMetricName = 
s"$TemporaryMemoryBytes,$fetchRequest"
+    val fetchMessageConversionsTimeMsMetricName = 
s"$MessageConversionsTimeMs,$fetchRequest"
     val initialFetchMessageConversionsPerSec = 
TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)
+    val initialFetchMessageConversionsTimeMs = 
TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName)
+    val initialFetchTemporaryMemoryBytes = 
TestUtils.metersCount(fetchTemporaryMemoryBytesMetricName)
     val topicWithDownConversionEnabled = "foo"
     val topicWithDownConversionDisabled = "bar"
     val replicaIds = brokers.map(_.config.brokerId)
@@ -216,15 +222,28 @@ class FetchRequestDownConversionConfigTest extends 
BaseRequestTest {
       Errors.forCode(fetchResponseData.get(tp).errorCode)
     }
 
+    def verifyMetrics(): Unit = {
+      TestUtils.waitUntilTrue(() => 
TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec) > 
initialFetchMessageConversionsPerSec,
+        s"The `FetchMessageConversionsPerSec` metric count is not incremented 
after 5 seconds. " +
+          s"init: $initialFetchMessageConversionsPerSec final: 
${TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)}", 5000)
+
+      TestUtils.waitUntilTrue(() => 
TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName) > 
initialFetchMessageConversionsTimeMs,
+        s"The `MessageConversionsTimeMs` in fetch request metric count is not 
incremented after 5 seconds. " +
+          s"init: $initialFetchMessageConversionsTimeMs final: 
${TestUtils.metersCount(fetchMessageConversionsTimeMsMetricName)}", 5000)
+
+      TestUtils.waitUntilTrue(() => 
TestUtils.metersCount(fetchTemporaryMemoryBytesMetricName) > 
initialFetchTemporaryMemoryBytes,
+        s"The `TemporaryMemoryBytes` in fetch request metric count is not 
incremented after 5 seconds. " +
+          s"init: $initialFetchTemporaryMemoryBytes final: 
${TestUtils.metersCount(fetchTemporaryMemoryBytesMetricName)}", 5000)
+    }
+
     assertEquals(Errors.NONE, error(partitionWithDownConversionEnabled))
     if (isFollowerFetch) {
       assertEquals(Errors.NONE, error(partitionWithDownConversionDisabled))
     } else {
       assertEquals(Errors.UNSUPPORTED_VERSION, 
error(partitionWithDownConversionDisabled))
     }
-    TestUtils.waitUntilTrue(() => 
TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec) > 
initialFetchMessageConversionsPerSec,
-      s"The `FetchMessageConversionsPerSec` metric count is not incremented 
after 5 seconds. " +
-      s"init: $initialFetchMessageConversionsPerSec final: 
${TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)}", 5000)
+
+    verifyMetrics()
   }
 
   private def sendFetch(
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ab72839fe22..dae338a3b08 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -28,7 +28,7 @@ import java.util
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{Callable, CompletableFuture, ExecutionException, 
Executors, TimeUnit}
 import java.util.{Arrays, Collections, Optional, Properties}
-import com.yammer.metrics.core.{Gauge, Meter}
+import com.yammer.metrics.core.{Gauge, Histogram, Meter}
 
 import javax.net.ssl.X509TrustManager
 import kafka.api._
@@ -2101,8 +2101,12 @@ object TestUtils extends Logging {
 
   def metersCount(metricName: String): Long = {
     KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
-      .filter { case (k, _) => k.getMBeanName.endsWith(metricName)}
-      .values.map(_.asInstanceOf[Meter].count()).sum
+      .filter { case (k, _) => k.getMBeanName.endsWith(metricName) }
+      .values.map {
+        case histogram: Histogram => histogram.count()
+        case meter: Meter => meter.count()
+        case _ => 0
+      }.sum
   }
 
   def clearYammerMetrics(): Unit = {

Reply via email to