This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7626a430792 KAFKA-14295 FetchMessageConversionsPerSec meter not
recorded (#13279)
7626a430792 is described below
commit 7626a43079298e88895b6f9e2fe3f8206da0155c
Author: Chia-Ping Tsai <[email protected]>
AuthorDate: Fri Feb 24 01:50:49 2023 +0800
KAFKA-14295 FetchMessageConversionsPerSec meter not recorded (#13279)
Reviewers: Luke Chen <[email protected]>
---
.../src/main/java/org/apache/kafka/common/network/NetworkSend.java | 4 ++++
core/src/main/scala/kafka/server/KafkaApis.scala | 4 +++-
.../unit/kafka/server/FetchRequestDownConversionConfigTest.scala | 4 ++++
core/src/test/scala/unit/kafka/utils/TestUtils.scala | 6 ++++++
4 files changed, 17 insertions(+), 1 deletion(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
index 2a51a56932f..f2977b9d9e5 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
@@ -31,6 +31,10 @@ public class NetworkSend implements Send {
return destinationId;
}
+ public Send send() {
+ return send;
+ }
+
@Override
public boolean completed() {
return send.completed();
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7b65bbe50d5..754b6e323fd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -49,7 +49,7 @@ import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetFor
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset,
OffsetForLeaderTopicResult, OffsetForLeaderTopicResultCollection}
import org.apache.kafka.common.message._
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ListenerName, Send}
+import org.apache.kafka.common.network.{ListenerName, NetworkSend, Send}
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.ClientMetadata
@@ -893,6 +893,8 @@ class KafkaApis(val requestChannel: RequestChannel,
send.recordConversionStats.asScala.toMap.foreach {
case (tp, stats) => updateRecordConversionStats(request, tp,
stats)
}
+ case send: NetworkSend =>
+ updateConversionStats(send.send())
case _ =>
}
}
diff --git
a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
index aa8a652741f..3f8ba8e099a 100644
---
a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
+++
b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -163,6 +163,7 @@ class FetchRequestDownConversionConfigTest extends
BaseRequestTest {
}
def testV1Fetch(isFollowerFetch: Boolean): Unit = {
+ val initialFetchMessageConversionsPerSec =
TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)
val topicWithDownConversionEnabled = "foo"
val topicWithDownConversionDisabled = "bar"
val replicaIds = brokers.map(_.config.brokerId)
@@ -221,6 +222,9 @@ class FetchRequestDownConversionConfigTest extends
BaseRequestTest {
} else {
assertEquals(Errors.UNSUPPORTED_VERSION,
error(partitionWithDownConversionDisabled))
}
+ TestUtils.waitUntilTrue(() =>
TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec) >
initialFetchMessageConversionsPerSec,
+ s"The `FetchMessageConversionsPerSec` metric count is not incremented
after 5 seconds. " +
+ s"init: $initialFetchMessageConversionsPerSec final:
${TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)}", 5000)
}
private def sendFetch(
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2a71f86a453..ab72839fe22 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -2099,6 +2099,12 @@ object TestUtils extends Logging {
.count
}
+ def metersCount(metricName: String): Long = {
+ KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+ .filter { case (k, _) => k.getMBeanName.endsWith(metricName)}
+ .values.map(_.asInstanceOf[Meter].count()).sum
+ }
+
def clearYammerMetrics(): Unit = {
for (metricName <-
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala)
KafkaYammerMetrics.defaultRegistry.removeMetric(metricName)