This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new dc081bb KAFKA-12660; Do not update offset commit sensor after append
failure (#10560)
dc081bb is described below
commit dc081bb27cc17294a40a3568613d1fa7023979a1
Author: dengziming <[email protected]>
AuthorDate: Fri Jul 9 01:13:23 2021 +0800
KAFKA-12660; Do not update offset commit sensor after append failure
(#10560)
Do not update the commit-sensor if the commit failed and add test logic.
The patch also adds 2 unit tests, the first for `OFFSET_METADATA_TOO_LARGE`
error, the second is to cover circumstance when one offset is committed and the
other is failed with `OFFSET_METADATA_TOO_LARGE`. Both of these cases were
uncovered previously.
Reviewers: Jason Gustafson <[email protected]>
---
.../coordinator/group/GroupMetadataManager.scala | 23 ++---
.../group/GroupMetadataManagerTest.scala | 97 ++++++++++++++++++++--
.../test/scala/unit/kafka/utils/TestUtils.scala | 9 +-
3 files changed, 111 insertions(+), 18 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 78d61ee..9e3769b 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -38,7 +38,7 @@ import kafka.utils._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.metrics.{Metrics, Sensor}
import org.apache.kafka.common.metrics.stats.{Avg, Max, Meter}
import org.apache.kafka.common.protocol.{ByteBufferAccessor, Errors,
MessageUtil}
import org.apache.kafka.common.record._
@@ -99,7 +99,7 @@ class GroupMetadataManager(brokerId: Int,
GroupMetadataManager.MetricsGroup,
"The avg time it took to load the partitions in the last 30sec"), new
Avg())
- val offsetCommitsSensor = metrics.sensor("OffsetCommits")
+ val offsetCommitsSensor: Sensor = metrics.sensor("OffsetCommits")
offsetCommitsSensor.add(new Meter(
metrics.metricName("offset-commit-rate",
@@ -109,7 +109,7 @@ class GroupMetadataManager(brokerId: Int,
"group-coordinator-metrics",
"The total number of committed offsets")))
- val offsetExpiredSensor = metrics.sensor("OffsetExpired")
+ val offsetExpiredSensor: Sensor = metrics.sensor("OffsetExpired")
offsetExpiredSensor.add(new Meter(
metrics.metricName("offset-expiration-rate",
@@ -184,9 +184,9 @@ class GroupMetadataManager(brokerId: Int,
def currentGroups: Iterable[GroupMetadata] = groupMetadataCache.values
- def isPartitionOwned(partition: Int) = inLock(partitionLock) {
ownedPartitions.contains(partition) }
+ def isPartitionOwned(partition: Int): Boolean = inLock(partitionLock) {
ownedPartitions.contains(partition) }
- def isPartitionLoading(partition: Int) = inLock(partitionLock) {
loadingPartitions.contains(partition) }
+ def isPartitionLoading(partition: Int): Boolean = inLock(partitionLock) {
loadingPartitions.contains(partition) }
def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) %
groupMetadataTopicPartitionCount
@@ -197,7 +197,7 @@ class GroupMetadataManager(brokerId: Int,
def isLoading: Boolean = inLock(partitionLock) { loadingPartitions.nonEmpty }
// return true iff group is owned and the group doesn't exist
- def groupNotExists(groupId: String) = inLock(partitionLock) {
+ def groupNotExists(groupId: String): Boolean = inLock(partitionLock) {
isGroupLocal(groupId) && getGroup(groupId).forall { group =>
group.inLock(group.is(Dead))
}
@@ -397,9 +397,6 @@ class GroupMetadataManager(brokerId: Int,
throw new IllegalStateException("Append status %s should only
have one partition %s"
.format(responseStatus, offsetTopicPartition))
- // record the number of offsets committed to the log
- offsetCommitsSensor.record(records.size)
-
// construct the commit response status and insert
// the offset and metadata to cache if the append status has no
error
val status = responseStatus(offsetTopicPartition)
@@ -414,6 +411,10 @@ class GroupMetadataManager(brokerId: Int,
group.onOffsetCommitAppend(topicPartition,
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
}
}
+
+ // Record the number of offsets committed to the log
+ offsetCommitsSensor.record(records.size)
+
Errors.NONE
} else {
if (!group.is(Dead)) {
@@ -954,11 +955,11 @@ class GroupMetadataManager(brokerId: Int,
private def groupsBelongingToPartitions(producerId: Long, partitions:
Set[Int]) = openGroupsForProducer synchronized {
val (ownedGroups, _) = openGroupsForProducer.getOrElse(producerId,
mutable.Set.empty[String])
- .partition { case (group) => partitions.contains(partitionFor(group)) }
+ .partition(group => partitions.contains(partitionFor(group)))
ownedGroups
}
- private def removeGroupFromAllProducers(groupId: String) =
openGroupsForProducer synchronized {
+ private def removeGroupFromAllProducers(groupId: String): Unit =
openGroupsForProducer synchronized {
openGroupsForProducer.forKeyValue { (_, groups) =>
groups.remove(groupId)
}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index d2d3b12..bf475cc 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -1045,7 +1045,7 @@ class GroupMetadataManagerTest {
val e = assertThrows(classOf[IllegalStateException],
() => GroupMetadataManager.readGroupMessageValue(groupId,
groupMetadataRecordValue, time))
- assertEquals(s"Unknown group metadata message version:
${unsupportedVersion}", e.getMessage)
+ assertEquals(s"Unknown group metadata message version:
$unsupportedVersion", e.getMessage)
}
@Test
@@ -1260,6 +1260,7 @@ class GroupMetadataManagerTest {
commitErrors = Some(errors)
}
+ assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
assertTrue(group.hasOffsets)
@@ -1277,6 +1278,8 @@ class GroupMetadataManagerTest {
assertEquals(offset, partitionResponse.offset)
EasyMock.verify(replicaManager)
+ // Will update sensor after commit
+ assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
}
@Test
@@ -1462,6 +1465,7 @@ class GroupMetadataManagerTest {
commitErrors = Some(errors)
}
+ assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
assertTrue(group.hasOffsets)
capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
@@ -1476,6 +1480,89 @@ class GroupMetadataManagerTest {
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicPartition).map(_.offset))
EasyMock.verify(replicaManager)
+ // Will not update sensor if failed
+ assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+ }
+
+ @Test
+ def testCommitOffsetPartialFailure(): Unit = {
+ EasyMock.reset(replicaManager)
+
+ val memberId = ""
+ val topicPartition = new TopicPartition("foo", 0)
+ val topicPartitionFailed = new TopicPartition("foo", 1)
+ val offset = 37
+
+ groupMetadataManager.addPartitionOwnership(groupPartitionId)
+
+ val group = new GroupMetadata(groupId, Empty, time)
+ groupMetadataManager.addGroup(group)
+
+ val offsets = immutable.Map(
+ topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()),
+ // This will failed
+ topicPartitionFailed -> OffsetAndMetadata(offset, "s" *
(offsetConfig.maxMetadataSize + 1) , time.milliseconds())
+ )
+
+ val capturedResponseCallback = appendAndCaptureCallback()
+ EasyMock.replay(replicaManager)
+
+ var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
+ def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+ commitErrors = Some(errors)
+ }
+
+ assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+ groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
+ assertTrue(group.hasOffsets)
+ capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
+ new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
+
+ assertFalse(commitErrors.isEmpty)
+ assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition))
+ assertEquals(Some(Errors.OFFSET_METADATA_TOO_LARGE),
commitErrors.get.get(topicPartitionFailed))
+ assertTrue(group.hasOffsets)
+
+ val cachedOffsets = groupMetadataManager.getOffsets(groupId,
defaultRequireStable, Some(Seq(topicPartition, topicPartitionFailed)))
+ assertEquals(Some(offset), cachedOffsets.get(topicPartition).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicPartitionFailed).map(_.offset))
+
+ EasyMock.verify(replicaManager)
+ assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+ }
+
+ @Test
+ def testOffsetMetadataTooLarge(): Unit = {
+ val memberId = ""
+ val topicPartition = new TopicPartition("foo", 0)
+ val offset = 37
+
+ groupMetadataManager.addPartitionOwnership(groupPartitionId)
+ val group = new GroupMetadata(groupId, Empty, time)
+ groupMetadataManager.addGroup(group)
+
+ val offsets = immutable.Map(
+ topicPartition -> OffsetAndMetadata(offset, "s" *
(offsetConfig.maxMetadataSize + 1) , time.milliseconds())
+ )
+
+ var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
+ def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+ commitErrors = Some(errors)
+ }
+
+ assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+ groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
+ assertFalse(group.hasOffsets)
+
+ assertFalse(commitErrors.isEmpty)
+ val maybeError = commitErrors.get.get(topicPartition)
+ assertEquals(Some(Errors.OFFSET_METADATA_TOO_LARGE), maybeError)
+ assertFalse(group.hasOffsets)
+
+ val cachedOffsets = groupMetadataManager.getOffsets(groupId,
defaultRequireStable, Some(Seq(topicPartition)))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicPartition).map(_.offset))
+
+ assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
}
@Test
@@ -1900,7 +1987,7 @@ class GroupMetadataManagerTest {
assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
// do not expire offsets while within retention period since commit
timestamp
- val expiryTimestamp = offsets.get(topicPartition1).get.commitTimestamp +
defaultOffsetRetentionMs
+ val expiryTimestamp = offsets(topicPartition1).commitTimestamp +
defaultOffsetRetentionMs
time.sleep(expiryTimestamp - time.milliseconds() - 1)
groupMetadataManager.cleanupGroupMetadata()
@@ -2074,7 +2161,7 @@ class GroupMetadataManagerTest {
}
@Test
- def testLoadOffsetFromOldCommit() = {
+ def testLoadOffsetFromOldCommit(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val generation = 935
val protocolType = "consumer"
@@ -2116,7 +2203,7 @@ class GroupMetadataManagerTest {
}
@Test
- def testLoadOffsetWithExplicitRetention() = {
+ def testLoadOffsetWithExplicitRetention(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val generation = 935
val protocolType = "consumer"
@@ -2391,7 +2478,7 @@ class GroupMetadataManagerTest {
EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(new IAnswer[Unit] {
- override def answer = capturedCallback.getValue.apply(
+ override def answer: Unit = capturedCallback.getValue.apply(
Map(groupTopicPartition ->
new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)
)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index f395a3d..51ea34a 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -26,8 +26,8 @@ import java.time.Duration
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
import java.util.{Arrays, Collections, Properties}
-
import com.yammer.metrics.core.Meter
+
import javax.net.ssl.X509TrustManager
import kafka.api._
import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
@@ -51,6 +51,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException,
UnknownTopicOrPart
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.internals.Topic
import
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
+import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
@@ -1710,7 +1711,11 @@ object TestUtils extends Logging {
}
def totalMetricValue(server: KafkaServer, metricName: String): Long = {
- val allMetrics = server.metrics.metrics
+ totalMetricValue(server.metrics, metricName)
+ }
+
+ def totalMetricValue(metrics: Metrics, metricName: String): Long = {
+ val allMetrics = metrics.metrics
val total = allMetrics.values().asScala.filter(_.metricName().name() ==
metricName)
.foldLeft(0.0)((total, metric) => total +
metric.metricValue.asInstanceOf[Double])
total.toLong