This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d479d129e0b KAFKA-13999: Add ProducerCount metrics (KIP-847) (#13078)
d479d129e0b is described below
commit d479d129e0b24f2c2173f2bfd1fb261ec2be757b
Author: Anastasia Vela <[email protected]>
AuthorDate: Wed Mar 1 14:20:15 2023 -0800
KAFKA-13999: Add ProducerCount metrics (KIP-847) (#13078)
This is the PR for the implementation of KIP-847:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-847%3A+Add+ProducerIdCount+metrics
Add ProducerIdCount metric at the broker level:
kafka.server:type=ReplicaManager,name=ProducerIdCount
Added unit tests below to ensure the metric reported the count correctly.
---------
Co-authored-by: Artem Livshits
<[email protected]>
Reviewers: Ismael Juma <[email protected]>, Divij Vaidya
<[email protected]>, Christo Lolov <[email protected]>, Alexandre Dupriez
<[email protected]>, Justine Olshan <[email protected]>
---
core/src/main/scala/kafka/cluster/Partition.scala | 5 ++
core/src/main/scala/kafka/log/UnifiedLog.scala | 13 +++-
.../main/scala/kafka/server/ReplicaManager.scala | 4 +
.../unit/kafka/server/BrokerMetricNamesTest.scala | 1 +
.../unit/kafka/server/ReplicaManagerTest.scala | 85 ++++++++++++++++++++++
docs/ops.html | 5 ++
.../internals/log/ProducerStateManager.java | 32 ++++++--
7 files changed, 137 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index d9838ff819e..68118ca196c 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -357,6 +357,11 @@ class Partition(val topicPartition: TopicPartition,
def isAddingReplica(replicaId: Int): Boolean =
assignmentState.isAddingReplica(replicaId)
+ def producerIdCount: Int = log.map(_.producerIdCount).getOrElse(0)
+
+ // Visible for testing
+ def removeExpiredProducers(currentTimeMs: Long): Unit =
log.foreach(_.removeExpiredProducers(currentTimeMs))
+
def inSyncReplicaIds: Set[Int] = partitionState.isr
def maybeAddListener(listener: PartitionListener): Boolean = {
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 6c220ffbc86..28cb98bad06 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -470,11 +470,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
- val producerExpireCheck =
scheduler.schedule("PeriodicProducerExpirationCheck", () => {
+ val producerExpireCheck =
scheduler.schedule("PeriodicProducerExpirationCheck", () =>
removeExpiredProducers(time.milliseconds),
+ producerIdExpirationCheckIntervalMs, producerIdExpirationCheckIntervalMs)
+
+ // Visible for testing
+ def removeExpiredProducers(currentTimeMs: Long): Unit = {
lock synchronized {
- producerStateManager.removeExpiredProducers(time.milliseconds)
+ producerStateManager.removeExpiredProducers(currentTimeMs)
}
- }, producerIdExpirationCheckIntervalMs, producerIdExpirationCheckIntervalMs)
+ }
// For compatibility, metrics are defined to be under `Log` class
override def metricName(name: String, tags: scala.collection.Map[String,
String]): MetricName = {
@@ -563,6 +567,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
producerStateManager.hasLateTransaction(currentTimeMs)
}
+ @threadsafe
+ def producerIdCount: Int = producerStateManager.producerIdCount
+
def activeProducers: Seq[DescribeProducersResponseData.ProducerState] = {
lock synchronized {
producerStateManager.activeProducers.asScala.map { case (producerId,
state) =>
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 597965d2d5c..b987db340c9 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -255,6 +255,7 @@ class ReplicaManager(val config: KafkaConfig,
newGauge("AtMinIsrPartitionCount", () =>
leaderPartitionsIterator.count(_.isAtMinIsr))
newGauge("ReassigningPartitions", () => reassigningPartitionsCount)
newGauge("PartitionsWithLateTransactionsCount", () => lateTransactionsCount)
+ newGauge("ProducerIdCount", () => producerIdCount)
def reassigningPartitionsCount: Int =
leaderPartitionsIterator.count(_.isReassigning)
@@ -263,6 +264,8 @@ class ReplicaManager(val config: KafkaConfig,
leaderPartitionsIterator.count(_.hasLateTransaction(currentTimeMs))
}
+ def producerIdCount: Int =
onlinePartitionsIterator.map(_.producerIdCount).sum
+
val isrExpandRate: Meter = newMeter("IsrExpandsPerSec", "expands",
TimeUnit.SECONDS)
val isrShrinkRate: Meter = newMeter("IsrShrinksPerSec", "shrinks",
TimeUnit.SECONDS)
val failedIsrUpdatesRate: Meter = newMeter("FailedIsrUpdatesPerSec",
"failedUpdates", TimeUnit.SECONDS)
@@ -1926,6 +1929,7 @@ class ReplicaManager(val config: KafkaConfig,
removeMetric("AtMinIsrPartitionCount")
removeMetric("ReassigningPartitions")
removeMetric("PartitionsWithLateTransactionsCount")
+ removeMetric("ProducerIdCount")
}
def beginControlledShutdown(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
index dc69076619d..1e3adc30ee2 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
@@ -48,6 +48,7 @@ class BrokerMetricNamesTest(cluster: ClusterInstance) {
"LeaderCount", "PartitionCount", "OfflineReplicaCount",
"UnderReplicatedPartitions",
"UnderMinIsrPartitionCount", "AtMinIsrPartitionCount",
"ReassigningPartitions",
"IsrExpandsPerSec", "IsrShrinksPerSec", "FailedIsrUpdatesPerSec",
+ "ProducerIdCount",
)
expectedMetricNames.foreach { metricName =>
assertEquals(1, metrics.keySet.asScala.count(_.getMBeanName ==
s"$expectedPrefix=$metricName"))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index f8d3ae3d490..91387ea315b 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -59,12 +59,14 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
+import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.MockScheduler
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation,
FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel,
LogOffsetMetadata, ProducerStateManager, ProducerStateManagerConfig}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
+import com.yammer.metrics.core.Gauge
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.mockito.ArgumentMatchers
@@ -462,6 +464,89 @@ class ReplicaManagerTest {
}
}
+ @Test
+ def testProducerIdCountMetrics(): Unit = {
+ val timer = new MockTimer(time)
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)
+
+ try {
+ val brokerList = Seq[Integer](0, 1).asJava
+
+ // Create a couple partition for the topic.
+ val partition0 = replicaManager.createPartition(new
TopicPartition(topic, 0))
+ partition0.createLogIfNotExists(isNew = false, isFutureReplica = false,
+ new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints),
None)
+ val partition1 = replicaManager.createPartition(new
TopicPartition(topic, 1))
+ partition1.createLogIfNotExists(isNew = false, isFutureReplica = false,
+ new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints),
None)
+
+ // Make this replica the leader for the partitions.
+ Seq(0, 1).foreach { partition =>
+ val leaderAndIsrRequest = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
brokerEpoch,
+ Seq(new LeaderAndIsrPartitionState()
+ .setTopicName(topic)
+ .setPartitionIndex(partition)
+ .setControllerEpoch(0)
+ .setLeader(0)
+ .setLeaderEpoch(0)
+ .setIsr(brokerList)
+ .setPartitionEpoch(0)
+ .setReplicas(brokerList)
+ .setIsNew(true)).asJava,
+ Collections.singletonMap(topic, Uuid.randomUuid()),
+ Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava,
+ false).build()
+ replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _)
=> ())
+ replicaManager.getPartitionOrException(new TopicPartition(topic,
partition))
+ .localLogOrException
+ }
+
+ def appendRecord(pid: Long, sequence: Int, partition: Int): Unit = {
+ val epoch = 42.toShort
+ val records =
MemoryRecords.withIdempotentRecords(CompressionType.NONE, pid, epoch, sequence,
+ new SimpleRecord(s"message $sequence".getBytes))
+ appendRecords(replicaManager, new TopicPartition(topic, partition),
records).onFire { response =>
+ assertEquals(Errors.NONE, response.error)
+ }
+ }
+
+ def replicaManagerMetricValue(): Int = {
+ KafkaYammerMetrics.defaultRegistry().allMetrics().asScala.filter {
case (metricName, _) =>
+ metricName.getName == "ProducerIdCount" && metricName.getType ==
replicaManager.getClass.getSimpleName
+ }.head._2.asInstanceOf[Gauge[Int]].value
+ }
+
+ // Initially all metrics are 0.
+ assertEquals(0, replicaManagerMetricValue())
+
+ val pid1 = 123L
+ // Produce a record from 1st pid to 1st partition.
+ appendRecord(pid1, 0, 0)
+ assertEquals(1, replicaManagerMetricValue())
+
+ // Produce another record from 1st pid to 1st partition, metrics
shouldn't change.
+ appendRecord(pid1, 1, 0)
+ assertEquals(1, replicaManagerMetricValue())
+
+ // Produce a record from 2nd pid to 1st partition
+ val pid2 = 456L
+ appendRecord(pid2, 1, 0)
+ assertEquals(2, replicaManagerMetricValue())
+
+ // Produce a record from 1st pid to 2nd partition
+ appendRecord(pid1, 0, 1)
+ assertEquals(3, replicaManagerMetricValue())
+
+ // Simulate producer id expiration.
+ // We use -1 because the timestamp in this test is set to -1, so when
+ // the expiration check subtracts timestamp, we get max value.
+ partition0.removeExpiredProducers(Long.MaxValue - 1);
+ assertEquals(1, replicaManagerMetricValue())
+ } finally {
+ replicaManager.shutdown(checkpointHW = false)
+ }
+ }
+
@Test
def testPartitionsWithLateTransactionsCount(): Unit = {
val timer = new MockTimer(time)
diff --git a/docs/ops.html b/docs/ops.html
index 96ea6d0e71a..8adeeca75cd 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1604,6 +1604,11 @@ $ bin/kafka-acls.sh \
<td>kafka.server:type=ReplicaManager,name=AtMinIsrPartitionCount</td>
<td>0</td>
</tr>
+ <tr>
+ <td>Producer Id counts</td>
+ <td>kafka.server:type=ReplicaManager,name=ProducerIdCount</td>
+ <td>Count of all producer ids created by transactional and idempotent
producers in each replica on the broker</td>
+ </tr>
<tr>
<td>Partition counts</td>
<td>kafka.server:type=ReplicaManager,name=PartitionCount</td>
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
index efa1a6e63dc..774e5be1ac1 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
@@ -121,6 +121,9 @@ public class ProducerStateManager {
private volatile File logDir;
+ // The same as producers.size, but for lock-free access.
+ private volatile int producerIdCount = 0;
+
// Keep track of the last timestamp from the oldest transaction. This is
used
// to detect (approximately) when a transaction has been left hanging on a
partition.
// We make the field volatile so that it can be safely accessed without a
lock.
@@ -162,6 +165,25 @@ public class ProducerStateManager {
snapshots = loadSnapshots();
}
+ public int producerIdCount() {
+ return producerIdCount;
+ }
+
+ private void addProducerId(long producerId, ProducerStateEntry entry) {
+ producers.put(producerId, entry);
+ producerIdCount = producers.size();
+ }
+
+ private void removeProducerIds(List<Long> keys) {
+ producers.keySet().removeAll(keys);
+ producerIdCount = producers.size();
+ }
+
+ private void clearProducerIds() {
+ producers.clear();
+ producerIdCount = 0;
+ }
+
/**
* Load producer state snapshots by scanning the logDir.
*/
@@ -306,7 +328,7 @@ public class ProducerStateManager {
// Visible for testing
public void loadProducerEntry(ProducerStateEntry entry) {
long producerId = entry.producerId();
- producers.put(producerId, entry);
+ addProducerId(producerId, entry);
entry.currentTxnFirstOffset().ifPresent(offset ->
ongoingTxns.put(offset, new TxnMetadata(producerId, offset)));
}
@@ -322,7 +344,7 @@ public class ProducerStateManager {
.filter(entry -> isProducerExpired(currentTimeMs,
entry.getValue()))
.map(Map.Entry::getKey)
.collect(Collectors.toList());
- producers.keySet().removeAll(keys);
+ removeProducerIds(keys);
}
/**
@@ -342,7 +364,7 @@ public class ProducerStateManager {
}
if (logEndOffset != mapEndOffset()) {
- producers.clear();
+ clearProducerIds();
ongoingTxns.clear();
updateOldestTxnTimestamp();
@@ -374,7 +396,7 @@ public class ProducerStateManager {
if (currentEntry != null) {
currentEntry.update(updatedEntry);
} else {
- producers.put(appendInfo.producerId(), updatedEntry);
+ addProducerId(appendInfo.producerId(), updatedEntry);
}
appendInfo.startedTransactions().forEach(txn ->
ongoingTxns.put(txn.firstOffset.messageOffset, txn));
@@ -479,7 +501,7 @@ public class ProducerStateManager {
* Truncate the producer id mapping and remove all snapshots. This resets
the state of the mapping.
*/
public void truncateFullyAndStartAt(long offset) throws IOException {
- producers.clear();
+ clearProducerIds();
ongoingTxns.clear();
unreplicatedTxns.clear();
for (SnapshotFile snapshotFile : snapshots.values()) {