This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new dc081bb  KAFKA-12660; Do not update offset commit sensor after append 
failure (#10560)
dc081bb is described below

commit dc081bb27cc17294a40a3568613d1fa7023979a1
Author: dengziming <[email protected]>
AuthorDate: Fri Jul 9 01:13:23 2021 +0800

    KAFKA-12660; Do not update offset commit sensor after append failure 
(#10560)
    
    Do not update the commit-sensor if the commit failed and add test logic. 
The patch also adds 2 unit tests, the first for `OFFSET_METADATA_TOO_LARGE` 
error, the second is to cover circumstance when one offset is committed and the 
other is failed with `OFFSET_METADATA_TOO_LARGE`. Both of these cases were 
uncovered previously.
    
    Reviewers: Jason Gustafson <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.scala   | 23 ++---
 .../group/GroupMetadataManagerTest.scala           | 97 ++++++++++++++++++++--
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  9 +-
 3 files changed, 111 insertions(+), 18 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 78d61ee..9e3769b 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -38,7 +38,7 @@ import kafka.utils._
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.metrics.{Metrics, Sensor}
 import org.apache.kafka.common.metrics.stats.{Avg, Max, Meter}
 import org.apache.kafka.common.protocol.{ByteBufferAccessor, Errors, 
MessageUtil}
 import org.apache.kafka.common.record._
@@ -99,7 +99,7 @@ class GroupMetadataManager(brokerId: Int,
     GroupMetadataManager.MetricsGroup,
     "The avg time it took to load the partitions in the last 30sec"), new 
Avg())
 
-  val offsetCommitsSensor = metrics.sensor("OffsetCommits")
+  val offsetCommitsSensor: Sensor = metrics.sensor("OffsetCommits")
 
   offsetCommitsSensor.add(new Meter(
     metrics.metricName("offset-commit-rate",
@@ -109,7 +109,7 @@ class GroupMetadataManager(brokerId: Int,
       "group-coordinator-metrics",
       "The total number of committed offsets")))
 
-  val offsetExpiredSensor = metrics.sensor("OffsetExpired")
+  val offsetExpiredSensor: Sensor = metrics.sensor("OffsetExpired")
 
   offsetExpiredSensor.add(new Meter(
     metrics.metricName("offset-expiration-rate",
@@ -184,9 +184,9 @@ class GroupMetadataManager(brokerId: Int,
 
   def currentGroups: Iterable[GroupMetadata] = groupMetadataCache.values
 
-  def isPartitionOwned(partition: Int) = inLock(partitionLock) { 
ownedPartitions.contains(partition) }
+  def isPartitionOwned(partition: Int): Boolean = inLock(partitionLock) { 
ownedPartitions.contains(partition) }
 
-  def isPartitionLoading(partition: Int) = inLock(partitionLock) { 
loadingPartitions.contains(partition) }
+  def isPartitionLoading(partition: Int): Boolean = inLock(partitionLock) { 
loadingPartitions.contains(partition) }
 
   def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % 
groupMetadataTopicPartitionCount
 
@@ -197,7 +197,7 @@ class GroupMetadataManager(brokerId: Int,
   def isLoading: Boolean = inLock(partitionLock) { loadingPartitions.nonEmpty }
 
   // return true iff group is owned and the group doesn't exist
-  def groupNotExists(groupId: String) = inLock(partitionLock) {
+  def groupNotExists(groupId: String): Boolean = inLock(partitionLock) {
     isGroupLocal(groupId) && getGroup(groupId).forall { group =>
       group.inLock(group.is(Dead))
     }
@@ -397,9 +397,6 @@ class GroupMetadataManager(brokerId: Int,
               throw new IllegalStateException("Append status %s should only 
have one partition %s"
                 .format(responseStatus, offsetTopicPartition))
 
-            // record the number of offsets committed to the log
-            offsetCommitsSensor.record(records.size)
-
             // construct the commit response status and insert
             // the offset and metadata to cache if the append status has no 
error
             val status = responseStatus(offsetTopicPartition)
@@ -414,6 +411,10 @@ class GroupMetadataManager(brokerId: Int,
                       group.onOffsetCommitAppend(topicPartition, 
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
                   }
                 }
+
+                // Record the number of offsets committed to the log
+                offsetCommitsSensor.record(records.size)
+
                 Errors.NONE
               } else {
                 if (!group.is(Dead)) {
@@ -954,11 +955,11 @@ class GroupMetadataManager(brokerId: Int,
 
   private def groupsBelongingToPartitions(producerId: Long, partitions: 
Set[Int]) = openGroupsForProducer synchronized {
     val (ownedGroups, _) = openGroupsForProducer.getOrElse(producerId, 
mutable.Set.empty[String])
-      .partition { case (group) => partitions.contains(partitionFor(group)) }
+      .partition(group => partitions.contains(partitionFor(group)))
     ownedGroups
   }
 
-  private def removeGroupFromAllProducers(groupId: String) = 
openGroupsForProducer synchronized {
+  private def removeGroupFromAllProducers(groupId: String): Unit = 
openGroupsForProducer synchronized {
     openGroupsForProducer.forKeyValue { (_, groups) =>
       groups.remove(groupId)
     }
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index d2d3b12..bf475cc 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -1045,7 +1045,7 @@ class GroupMetadataManagerTest {
 
     val e = assertThrows(classOf[IllegalStateException],
       () => GroupMetadataManager.readGroupMessageValue(groupId, 
groupMetadataRecordValue, time))
-    assertEquals(s"Unknown group metadata message version: 
${unsupportedVersion}", e.getMessage)
+    assertEquals(s"Unknown group metadata message version: 
$unsupportedVersion", e.getMessage)
   }
 
   @Test
@@ -1260,6 +1260,7 @@ class GroupMetadataManagerTest {
       commitErrors = Some(errors)
     }
 
+    assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
     groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
     assertTrue(group.hasOffsets)
 
@@ -1277,6 +1278,8 @@ class GroupMetadataManagerTest {
     assertEquals(offset, partitionResponse.offset)
 
     EasyMock.verify(replicaManager)
+    // Will update sensor after commit
+    assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
   @Test
@@ -1462,6 +1465,7 @@ class GroupMetadataManagerTest {
       commitErrors = Some(errors)
     }
 
+    assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
     groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
     assertTrue(group.hasOffsets)
     capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
@@ -1476,6 +1480,89 @@ class GroupMetadataManagerTest {
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartition).map(_.offset))
 
     EasyMock.verify(replicaManager)
+    // Will not update sensor if failed
+    assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+  }
+
+  @Test
+  def testCommitOffsetPartialFailure(): Unit = {
+    EasyMock.reset(replicaManager)
+
+    val memberId = ""
+    val topicPartition = new TopicPartition("foo", 0)
+    val topicPartitionFailed = new TopicPartition("foo", 1)
+    val offset = 37
+
+    groupMetadataManager.addPartitionOwnership(groupPartitionId)
+
+    val group = new GroupMetadata(groupId, Empty, time)
+    groupMetadataManager.addGroup(group)
+
+    val offsets = immutable.Map(
+      topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()),
+      // This will failed
+      topicPartitionFailed -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds())
+    )
+
+    val capturedResponseCallback = appendAndCaptureCallback()
+    EasyMock.replay(replicaManager)
+
+    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+      commitErrors = Some(errors)
+    }
+
+    assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+    groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
+    assertTrue(group.hasOffsets)
+    capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
+      new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
+
+    assertFalse(commitErrors.isEmpty)
+    assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition))
+    assertEquals(Some(Errors.OFFSET_METADATA_TOO_LARGE), 
commitErrors.get.get(topicPartitionFailed))
+    assertTrue(group.hasOffsets)
+
+    val cachedOffsets = groupMetadataManager.getOffsets(groupId, 
defaultRequireStable, Some(Seq(topicPartition, topicPartitionFailed)))
+    assertEquals(Some(offset), cachedOffsets.get(topicPartition).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartitionFailed).map(_.offset))
+
+    EasyMock.verify(replicaManager)
+    assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+  }
+
+  @Test
+  def testOffsetMetadataTooLarge(): Unit = {
+    val memberId = ""
+    val topicPartition = new TopicPartition("foo", 0)
+    val offset = 37
+
+    groupMetadataManager.addPartitionOwnership(groupPartitionId)
+    val group = new GroupMetadata(groupId, Empty, time)
+    groupMetadataManager.addGroup(group)
+
+    val offsets = immutable.Map(
+      topicPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds())
+    )
+
+    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+      commitErrors = Some(errors)
+    }
+
+    assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+    groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
+    assertFalse(group.hasOffsets)
+
+    assertFalse(commitErrors.isEmpty)
+    val maybeError = commitErrors.get.get(topicPartition)
+    assertEquals(Some(Errors.OFFSET_METADATA_TOO_LARGE), maybeError)
+    assertFalse(group.hasOffsets)
+
+    val cachedOffsets = groupMetadataManager.getOffsets(groupId, 
defaultRequireStable, Some(Seq(topicPartition)))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartition).map(_.offset))
+
+    assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
   @Test
@@ -1900,7 +1987,7 @@ class GroupMetadataManagerTest {
     assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
 
     // do not expire offsets while within retention period since commit 
timestamp
-    val expiryTimestamp = offsets.get(topicPartition1).get.commitTimestamp + 
defaultOffsetRetentionMs
+    val expiryTimestamp = offsets(topicPartition1).commitTimestamp + 
defaultOffsetRetentionMs
     time.sleep(expiryTimestamp - time.milliseconds() - 1)
 
     groupMetadataManager.cleanupGroupMetadata()
@@ -2074,7 +2161,7 @@ class GroupMetadataManagerTest {
   }
 
   @Test
-  def testLoadOffsetFromOldCommit() = {
+  def testLoadOffsetFromOldCommit(): Unit = {
     val groupMetadataTopicPartition = groupTopicPartition
     val generation = 935
     val protocolType = "consumer"
@@ -2116,7 +2203,7 @@ class GroupMetadataManagerTest {
   }
 
   @Test
-  def testLoadOffsetWithExplicitRetention() = {
+  def testLoadOffsetWithExplicitRetention(): Unit = {
     val groupMetadataTopicPartition = groupTopicPartition
     val generation = 935
     val protocolType = "consumer"
@@ -2391,7 +2478,7 @@ class GroupMetadataManagerTest {
       EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
-      override def answer = capturedCallback.getValue.apply(
+      override def answer: Unit = capturedCallback.getValue.apply(
         Map(groupTopicPartition ->
           new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)
         )
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index f395a3d..51ea34a 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -26,8 +26,8 @@ import java.time.Duration
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
 import java.util.{Arrays, Collections, Properties}
-
 import com.yammer.metrics.core.Meter
+
 import javax.net.ssl.X509TrustManager
 import kafka.api._
 import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
@@ -51,6 +51,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException, 
UnknownTopicOrPart
 import org.apache.kafka.common.header.Header
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
+import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
@@ -1710,7 +1711,11 @@ object TestUtils extends Logging {
   }
 
   def totalMetricValue(server: KafkaServer, metricName: String): Long = {
-    val allMetrics = server.metrics.metrics
+    totalMetricValue(server.metrics, metricName)
+  }
+
+  def totalMetricValue(metrics: Metrics, metricName: String): Long = {
+    val allMetrics = metrics.metrics
     val total = allMetrics.values().asScala.filter(_.metricName().name() == 
metricName)
       .foldLeft(0.0)((total, metric) => total + 
metric.metricValue.asInstanceOf[Double])
     total.toLong

Reply via email to