This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new e59215a  KAFKA-10614: Ensure group state (un)load is executed in the 
right order (#9441)
e59215a is described below

commit e59215a006e410fb87a4b2ac61765edb031dba40
Author: Tom Bentley <[email protected]>
AuthorDate: Mon Jun 7 14:19:48 2021 +0100

    KAFKA-10614: Ensure group state (un)load is executed in the right order 
(#9441)
    
    Co-authored-by: Jason Gustafson<[email protected]>
    Reviewers: Jason Gustafson<[email protected]>, Guozhang Wang 
<[email protected]>
---
 .../kafka/coordinator/group/GroupCoordinator.scala |  12 +-
 .../coordinator/group/GroupMetadataManager.scala   | 112 +++++++++++-----
 core/src/main/scala/kafka/server/KafkaApis.scala   |  14 +-
 .../scala/kafka/server/RequestHandlerHelper.scala  |   4 +-
 .../coordinator/group/GroupCoordinatorTest.scala   |   4 +-
 .../group/GroupMetadataManagerTest.scala           | 146 +++++++++++++++++----
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   6 +-
 7 files changed, 228 insertions(+), 70 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 441b46d..0067549 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -906,9 +906,9 @@ class GroupCoordinator(val brokerId: Int,
    *
    * @param offsetTopicPartitionId The partition we are now leading
    */
-  def onElection(offsetTopicPartitionId: Int): Unit = {
-    info(s"Elected as the group coordinator for partition 
$offsetTopicPartitionId")
-    groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, 
onGroupLoaded)
+  def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
+    info(s"Elected as the group coordinator for partition 
$offsetTopicPartitionId in epoch $coordinatorEpoch")
+    groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, 
coordinatorEpoch, onGroupLoaded)
   }
 
   /**
@@ -916,9 +916,9 @@ class GroupCoordinator(val brokerId: Int,
    *
    * @param offsetTopicPartitionId The partition we are no longer leading
    */
-  def onResignation(offsetTopicPartitionId: Int): Unit = {
-    info(s"Resigned as the group coordinator for partition 
$offsetTopicPartitionId")
-    groupManager.removeGroupsForPartition(offsetTopicPartitionId, 
onGroupUnloaded)
+  def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: 
Option[Int]): Unit = {
+    info(s"Resigned as the group coordinator for partition 
$offsetTopicPartitionId in epoch $coordinatorEpoch")
+    groupManager.removeGroupsForPartition(offsetTopicPartitionId, 
coordinatorEpoch, onGroupUnloaded)
   }
 
   private def setAndPropagateAssignment(group: GroupMetadata, assignment: 
Map[String, Array[Byte]]): Unit = {
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index fb6d07e..cb03c4c 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -21,9 +21,9 @@ import java.io.PrintStream
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
 import java.util.Optional
-import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 
 import com.yammer.metrics.core.Gauge
 import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1, 
KAFKA_2_3_IV0}
@@ -86,6 +86,9 @@ class GroupMetadataManager(brokerId: Int,
    * We use this structure to quickly find the groups which need to be updated 
by the commit/abort marker. */
   private val openGroupsForProducer = mutable.HashMap[Long, 
mutable.Set[String]]()
 
+  /* Track the epoch in which we (un)loaded group state to detect racing 
LeaderAndIsr requests */
+  private [group] val epochForPartitionId = new ConcurrentHashMap[Int, 
java.lang.Integer]()
+
   /* setup metrics*/
   private val partitionLoadSensor = 
metrics.sensor(GroupMetadataManager.LoadTimeSensor)
 
@@ -526,34 +529,42 @@ class GroupMetadataManager(brokerId: Int,
   /**
    * Asynchronously read the partition from the offsets topic and populate the 
cache
    */
-  def scheduleLoadGroupAndOffsets(offsetsPartition: Int, onGroupLoaded: 
GroupMetadata => Unit): Unit = {
+  def scheduleLoadGroupAndOffsets(offsetsPartition: Int, coordinatorEpoch: 
Int, onGroupLoaded: GroupMetadata => Unit): Unit = {
     val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
offsetsPartition)
-    if (addLoadingPartition(offsetsPartition)) {
-      info(s"Scheduling loading of offsets and group metadata from 
$topicPartition")
-      val startTimeMs = time.milliseconds()
-      scheduler.schedule(topicPartition.toString, () => 
loadGroupsAndOffsets(topicPartition, onGroupLoaded, startTimeMs))
-    } else {
-      info(s"Already loading offsets and group metadata from $topicPartition")
-    }
+    info(s"Scheduling loading of offsets and group metadata from 
$topicPartition for epoch $coordinatorEpoch")
+    val startTimeMs = time.milliseconds()
+    scheduler.schedule(topicPartition.toString, () => 
loadGroupsAndOffsets(topicPartition, coordinatorEpoch, onGroupLoaded, 
startTimeMs))
   }
 
-  private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, 
onGroupLoaded: GroupMetadata => Unit, startTimeMs: java.lang.Long): Unit = {
-    try {
-      val schedulerTimeMs = time.milliseconds() - startTimeMs
-      debug(s"Started loading offsets and group metadata from $topicPartition")
-      doLoadGroupsAndOffsets(topicPartition, onGroupLoaded)
-      val endTimeMs = time.milliseconds()
-      val totalLoadingTimeMs = endTimeMs - startTimeMs
-      partitionLoadSensor.record(totalLoadingTimeMs.toDouble, endTimeMs, false)
-      info(s"Finished loading offsets and group metadata from $topicPartition "
-        + s"in $totalLoadingTimeMs milliseconds, of which $schedulerTimeMs 
milliseconds"
-        + s" was spent in the scheduler.")
-    } catch {
-      case t: Throwable => error(s"Error loading offsets from 
$topicPartition", t)
-    } finally {
-      inLock(partitionLock) {
-        ownedPartitions.add(topicPartition.partition)
-        loadingPartitions.remove(topicPartition.partition)
+  private[group] def loadGroupsAndOffsets(
+    topicPartition: TopicPartition,
+    coordinatorEpoch: Int,
+    onGroupLoaded: GroupMetadata => Unit,
+    startTimeMs: java.lang.Long
+  ): Unit = {
+    if (!maybeUpdateCoordinatorEpoch(topicPartition.partition, 
Some(coordinatorEpoch))) {
+      info(s"Not loading offsets and group metadata for $topicPartition " +
+        s"in epoch $coordinatorEpoch since current epoch is 
${epochForPartitionId.get(topicPartition.partition)}")
+    } else if (!addLoadingPartition(topicPartition.partition)) {
+      info(s"Already loading offsets and group metadata from $topicPartition")
+    } else {
+      try {
+        val schedulerTimeMs = time.milliseconds() - startTimeMs
+        debug(s"Started loading offsets and group metadata from 
$topicPartition for epoch $coordinatorEpoch")
+        doLoadGroupsAndOffsets(topicPartition, onGroupLoaded)
+        val endTimeMs = time.milliseconds()
+        val totalLoadingTimeMs = endTimeMs - startTimeMs
+        partitionLoadSensor.record(totalLoadingTimeMs.toDouble, endTimeMs, 
false)
+        info(s"Finished loading offsets and group metadata from 
$topicPartition "
+          + s"in $totalLoadingTimeMs milliseconds for epoch $coordinatorEpoch, 
of which " +
+          s"$schedulerTimeMs milliseconds was spent in the scheduler.")
+      } catch {
+        case t: Throwable => error(s"Error loading offsets from 
$topicPartition", t)
+      } finally {
+        inLock(partitionLock) {
+          ownedPartitions.add(topicPartition.partition)
+          loadingPartitions.remove(topicPartition.partition)
+        }
       }
     }
   }
@@ -747,20 +758,28 @@ class GroupMetadataManager(brokerId: Int,
    * @param offsetsPartition Groups belonging to this partition of the offsets 
topic will be deleted from the cache.
    */
   def removeGroupsForPartition(offsetsPartition: Int,
+                               coordinatorEpoch: Option[Int],
                                onGroupUnloaded: GroupMetadata => Unit): Unit = 
{
     val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
offsetsPartition)
     info(s"Scheduling unloading of offsets and group metadata from 
$topicPartition")
-    scheduler.schedule(topicPartition.toString, () => removeGroupsAndOffsets())
+    scheduler.schedule(topicPartition.toString, () => 
removeGroupsAndOffsets(topicPartition, coordinatorEpoch, onGroupUnloaded))
+  }
 
-    def removeGroupsAndOffsets(): Unit = {
+  private [group] def removeGroupsAndOffsets(topicPartition: TopicPartition,
+                                             coordinatorEpoch: Option[Int],
+                                             onGroupUnloaded: GroupMetadata => 
Unit): Unit = {
+    val offsetsPartition = topicPartition.partition
+    if (maybeUpdateCoordinatorEpoch(offsetsPartition, coordinatorEpoch)) {
       var numOffsetsRemoved = 0
       var numGroupsRemoved = 0
 
-      debug(s"Started unloading offsets and group metadata for 
$topicPartition")
+      debug(s"Started unloading offsets and group metadata for $topicPartition 
for " +
+        s"coordinator epoch $coordinatorEpoch")
       inLock(partitionLock) {
         // we need to guard the group removal in cache in the loading 
partition lock
         // to prevent coordinator's check-and-get-group race condition
         ownedPartitions.remove(offsetsPartition)
+        loadingPartitions.remove(offsetsPartition)
 
         for (group <- groupMetadataCache.values) {
           if (partitionFor(group.groupId) == offsetsPartition) {
@@ -772,12 +791,35 @@ class GroupMetadataManager(brokerId: Int,
           }
         }
       }
-
-      info(s"Finished unloading $topicPartition. Removed $numOffsetsRemoved 
cached offsets " +
-        s"and $numGroupsRemoved cached groups.")
+      info(s"Finished unloading $topicPartition for coordinator epoch 
$coordinatorEpoch. " +
+        s"Removed $numOffsetsRemoved cached offsets and $numGroupsRemoved 
cached groups.")
+    } else {
+      info(s"Not removing offsets and group metadata for $topicPartition " +
+        s"in epoch $coordinatorEpoch since current epoch is 
${epochForPartitionId.get(topicPartition.partition)}")
     }
   }
 
+  /**
+   * Update the cached coordinator epoch if the new value is larger than the 
old value.
+   * @return true if `epochOpt` is either empty or contains a value greater 
than or equal to the current epoch
+   */
+  private def maybeUpdateCoordinatorEpoch(
+    partitionId: Int,
+    epochOpt: Option[Int]
+  ): Boolean = {
+    val updatedEpoch = epochForPartitionId.compute(partitionId, (_, 
currentEpoch) => {
+      if (currentEpoch == null) {
+        epochOpt.map(Int.box).orNull
+      } else {
+        epochOpt match {
+          case Some(epoch) if epoch > currentEpoch => epoch
+          case _ => currentEpoch
+        }
+      }
+    })
+    epochOpt.forall(_ == updatedEpoch)
+  }
+
   // visible for testing
   private[group] def cleanupGroupMetadata(): Unit = {
     val currentTimestamp = time.milliseconds()
@@ -961,7 +1003,11 @@ class GroupMetadataManager(brokerId: Int,
    */
   private[group] def addLoadingPartition(partition: Int): Boolean = {
     inLock(partitionLock) {
-      loadingPartitions.add(partition)
+      if (ownedPartitions.contains(partition)) {
+        false
+      } else {
+        loadingPartitions.add(partition)
+      }
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 124c5a9..088444d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -286,14 +286,18 @@ class KafkaApis(val requestChannel: RequestChannel,
       // cannot rely on the LeaderAndIsr API for this since it is only sent to 
active replicas.
       result.forKeyValue { (topicPartition, error) =>
         if (error == Errors.NONE) {
+          val partitionState = partitionStates(topicPartition)
           if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
-              && partitionStates(topicPartition).deletePartition) {
-            groupCoordinator.onResignation(topicPartition.partition)
+              && partitionState.deletePartition) {
+            val leaderEpoch = if (partitionState.leaderEpoch >= 0)
+              Some(partitionState.leaderEpoch)
+            else
+              None
+            groupCoordinator.onResignation(topicPartition.partition, 
leaderEpoch)
           } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
-                     && partitionStates(topicPartition).deletePartition) {
-            val partitionState = partitionStates(topicPartition)
+                     && partitionState.deletePartition) {
             val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-                Some(partitionState.leaderEpoch)
+              Some(partitionState.leaderEpoch)
             else
               None
             txnCoordinator.onResignation(topicPartition.partition, 
coordinatorEpoch = leaderEpoch)
diff --git a/core/src/main/scala/kafka/server/RequestHandlerHelper.scala 
b/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
index cf2b816..804e888 100644
--- a/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
+++ b/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
@@ -44,14 +44,14 @@ object RequestHandlerHelper {
     // leadership changes
     updatedLeaders.foreach { partition =>
       if (partition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
-        groupCoordinator.onElection(partition.partitionId)
+        groupCoordinator.onElection(partition.partitionId, 
partition.getLeaderEpoch)
       else if (partition.topic == Topic.TRANSACTION_STATE_TOPIC_NAME)
         txnCoordinator.onElection(partition.partitionId, 
partition.getLeaderEpoch)
     }
 
     updatedFollowers.foreach { partition =>
       if (partition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
-        groupCoordinator.onResignation(partition.partitionId)
+        groupCoordinator.onResignation(partition.partitionId, 
Some(partition.getLeaderEpoch))
       else if (partition.topic == Topic.TRANSACTION_STATE_TOPIC_NAME)
         txnCoordinator.onResignation(partition.partitionId, 
Some(partition.getLeaderEpoch))
     }
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 3e1f820..51aa9ed 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -189,7 +189,9 @@ class GroupCoordinatorTest {
     EasyMock.reset(replicaManager)
     
EasyMock.expect(replicaManager.getLog(otherGroupMetadataTopicPartition)).andReturn(None)
     EasyMock.replay(replicaManager)
-    
groupCoordinator.groupManager.loadGroupsAndOffsets(otherGroupMetadataTopicPartition,
 group => {}, 0L)
+    // Call removeGroupsAndOffsets so that partition removed from 
loadingPartitions
+    
groupCoordinator.groupManager.removeGroupsAndOffsets(otherGroupMetadataTopicPartition,
 Some(1), group => {})
+    
groupCoordinator.groupManager.loadGroupsAndOffsets(otherGroupMetadataTopicPartition,
 1, group => {}, 0L)
     assertEquals(Errors.NONE, 
groupCoordinator.handleDescribeGroup(otherGroupId)._1)
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 45eccbe..60f0822 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -128,6 +128,7 @@ class GroupMetadataManagerTest {
   def testLoadOffsetsWithoutGroup(): Unit = {
     val groupMetadataTopicPartition = groupTopicPartition
     val startOffset = 15L
+    val groupEpoch = 2
 
     val committedOffsets = Map(
       new TopicPartition("foo", 0) -> 23L,
@@ -141,7 +142,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, 
groupEpoch, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -158,6 +159,7 @@ class GroupMetadataManagerTest {
     val generation = 15
     val protocolType = "consumer"
     val startOffset = 15L
+    val groupEpoch = 2
     val committedOffsets = Map(
       new TopicPartition("foo", 0) -> 23L,
       new TopicPartition("foo", 1) -> 455L,
@@ -173,7 +175,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, 
groupEpoch, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -192,6 +194,7 @@ class GroupMetadataManagerTest {
     val groupMetadataTopicPartition = groupTopicPartition
     val producerId = 1000L
     val producerEpoch: Short = 2
+    val groupEpoch = 2
 
     val committedOffsets = Map(
       new TopicPartition("foo", 0) -> 23L,
@@ -210,7 +213,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, 
groupEpoch, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -226,6 +229,7 @@ class GroupMetadataManagerTest {
     val groupMetadataTopicPartition = groupTopicPartition
     val producerId = 1000L
     val producerEpoch: Short = 2
+    val groupEpoch = 2
 
     val abortedOffsets = Map(
       new TopicPartition("foo", 0) -> 23L,
@@ -244,7 +248,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, 
groupEpoch, _ => (), 0L)
 
     // Since there are no committed offsets for the group, and there is no 
other group metadata, we don't expect the
     // group to be loaded.
@@ -256,6 +260,7 @@ class GroupMetadataManagerTest {
     val groupMetadataTopicPartition = groupTopicPartition
     val producerId = 1000L
     val producerEpoch: Short = 2
+    val groupEpoch = 2
 
     val foo0 = new TopicPartition("foo", 0)
     val foo1 = new TopicPartition("foo", 1)
@@ -276,7 +281,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, 
groupEpoch, _ => (), 0L)
 
     // The group should be loaded with pending offsets.
     val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
@@ -297,6 +302,7 @@ class GroupMetadataManagerTest {
     val groupMetadataTopicPartition = groupTopicPartition
     val producerId = 1000L
     val producerEpoch: Short = 2
+    val groupEpoch = 2
 
     val committedOffsets = Map(
       new TopicPartition("foo", 0) -> 23L,
@@ -323,7 +329,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, 
groupEpoch, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -342,6 +348,7 @@ class GroupMetadataManagerTest {
     val groupMetadataTopicPartition = groupTopicPartition
     val producerId = 1000L
     val producerEpoch: Short = 2
+    val groupEpoch = 2
 
     val committedOffsets = Map(
       new TopicPartition("foo", 0) -> 23L,
@@ -378,7 +385,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, 
groupEpoch, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -411,6 +418,7 @@ class GroupMetadataManagerTest {
     val firstProducerEpoch: Short = 2
     val secondProducerId = 1001L
     val secondProducerEpoch: Short = 3
+    val groupEpoch = 2
 
     val committedOffsetsFirstProducer = Map(
       new TopicPartition("foo", 0) -> 23L,
@@ -441,7 +449,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, 
groupEpoch, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -465,6 +473,7 @@ class GroupMetadataManagerTest {
     val groupMetadataTopicPartition = groupTopicPartition
     val producerId = 1000L
     val producerEpoch: Short = 2
+    val groupEpoch = 2
 
     val transactionalOffsetCommits = Map(
       new TopicPartition("foo", 0) -> 23L
@@ -487,7 +496,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, 
groupEpoch, _ => (), 0L)
 
     // The group should be loaded with pending offsets.
     val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
@@ -508,6 +517,7 @@ class GroupMetadataManagerTest {
     val groupMetadataTopicPartition = groupTopicPartition
     val producerId = 1000L
     val producerEpoch: Short = 2
+    val groupEpoch = 2
 
     val transactionalOffsetCommits = Map(
       new TopicPartition("foo", 0) -> 23L
@@ -529,7 +539,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, 
groupEpoch, _ => (), 0L)
 
     // The group should be loaded with pending offsets.
     val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
@@ -596,6 +606,7 @@ class GroupMetadataManagerTest {
   def testLoadOffsetsWithTombstones(): Unit = {
     val groupMetadataTopicPartition = groupTopicPartition
     val startOffset = 15L
+    val groupEpoch = 2
 
     val tombstonePartition = new TopicPartition("foo", 1)
     val committedOffsets = Map(
@@ -613,7 +624,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, 
groupEpoch, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -629,7 +640,10 @@ class GroupMetadataManagerTest {
 
   @Test
   def testLoadOffsetsAndGroup(): Unit = {
-    val groupMetadataTopicPartition = groupTopicPartition
+    loadOffsetsAndGroup(groupTopicPartition, 2)
+  }
+
+  def loadOffsetsAndGroup(groupMetadataTopicPartition: TopicPartition, 
groupEpoch: Int): GroupMetadata = {
     val generation = 935
     val protocolType = "consumer"
     val protocol = "range"
@@ -650,7 +664,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, 
groupEpoch, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -665,12 +679,92 @@ class GroupMetadataManagerTest {
       assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
       
assertTrue(group.offset(topicPartition).map(_.expireTimestamp).contains(None))
     }
+    group
+  }
+
+  @Test
+  def testLoadOffsetsAndGroupIgnored(): Unit = {
+    val groupEpoch = 2
+    loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
+    assertEquals(groupEpoch, 
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
+
+    groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, 
Some(groupEpoch), _ => ())
+    assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
+      "Removed group remained in cache")
+    assertEquals(groupEpoch, 
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
+
+    groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, groupEpoch 
- 1, _ => (), 0L)
+    assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
+      "Removed group remained in cache")
+    assertEquals(groupEpoch, 
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
+  }
+
+  @Test
+  def testUnloadOffsetsAndGroup(): Unit = {
+    val groupEpoch = 2
+    loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
+
+    groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, 
Some(groupEpoch), _ => ())
+    assertEquals(groupEpoch, 
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
+    assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
+    "Removed group remained in cache")
+  }
+
+  @Test
+  def testUnloadOffsetsAndGroupIgnored(): Unit = {
+    val groupEpoch = 2
+    val initiallyLoaded = loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
+
+    groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, 
Some(groupEpoch - 1), _ => ())
+    assertEquals(groupEpoch, 
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
+    assertEquals(initiallyLoaded.groupId, group.groupId)
+    assertEquals(initiallyLoaded.currentState, group.currentState)
+    assertEquals(initiallyLoaded.leaderOrNull, group.leaderOrNull)
+    assertEquals(initiallyLoaded.generationId, group.generationId)
+    assertEquals(initiallyLoaded.protocolType, group.protocolType)
+    assertEquals(initiallyLoaded.protocolName.orNull, 
group.protocolName.orNull)
+    assertEquals(initiallyLoaded.allMembers, group.allMembers)
+    assertEquals(initiallyLoaded.allOffsets.size, group.allOffsets.size)
+    initiallyLoaded.allOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition))
+      
assertTrue(group.offset(topicPartition).map(_.expireTimestamp).contains(None))
+    }
+  }
+
+  @Test
+  def testUnloadOffsetsAndGroupIgnoredAfterStopReplica(): Unit = {
+    val groupEpoch = 2
+    val initiallyLoaded = loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
+
+    groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, None, _ 
=> ())
+    assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
+      "Removed group remained in cache")
+    assertEquals(groupEpoch, 
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()),
+    "Replica which was stopped still in epochForPartitionId")
+
+    EasyMock.reset(replicaManager)
+    loadOffsetsAndGroup(groupTopicPartition, groupEpoch + 1)
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
+    assertEquals(initiallyLoaded.groupId, group.groupId)
+    assertEquals(initiallyLoaded.currentState, group.currentState)
+    assertEquals(initiallyLoaded.leaderOrNull, group.leaderOrNull)
+    assertEquals(initiallyLoaded.generationId, group.generationId)
+    assertEquals(initiallyLoaded.protocolType, group.protocolType)
+    assertEquals(initiallyLoaded.protocolName.orNull, 
group.protocolName.orNull)
+    assertEquals(initiallyLoaded.allMembers, group.allMembers)
+    assertEquals(initiallyLoaded.allOffsets.size, group.allOffsets.size)
+    initiallyLoaded.allOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition))
+      
assertTrue(group.offset(topicPartition).map(_.expireTimestamp).contains(None))
+    }
   }
 
   @Test
   def testLoadGroupWithTombstone(): Unit = {
     val groupMetadataTopicPartition = groupTopicPartition
     val startOffset = 15L
+    val groupEpoch = 2
     val memberId = "98098230493"
     val groupMetadataRecord = buildStableGroupRecordWithMember(generation = 15,
       protocolType = "consumer", protocol = "range", memberId)
@@ -682,7 +776,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, 
groupEpoch, _ => (), 0L)
 
     assertEquals(None, groupMetadataManager.getGroup(groupId))
   }
@@ -691,6 +785,7 @@ class GroupMetadataManagerTest {
   def testLoadGroupWithLargeGroupMetadataRecord(): Unit = {
     val groupMetadataTopicPartition = groupTopicPartition
     val startOffset = 15L
+    val groupEpoch = 2
     val committedOffsets = Map(
       new TopicPartition("foo", 0) -> 23L,
       new TopicPartition("foo", 1) -> 455L,
@@ -711,7 +806,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, 
groupEpoch, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
     committedOffsets.foreach { case (topicPartition, offset) =>
@@ -726,6 +821,7 @@ class GroupMetadataManagerTest {
     // is accidentally corrupted.
     val startOffset = 0L
     val endOffset = 10L
+    val groupEpoch = 2
 
     val logMock: Log = EasyMock.mock(classOf[Log])
     
EasyMock.expect(replicaManager.getLog(groupTopicPartition)).andStubReturn(Some(logMock))
@@ -735,7 +831,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, _ => (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, groupEpoch, 
_ => (), 0L)
 
     EasyMock.verify(logMock)
     EasyMock.verify(replicaManager)
@@ -754,6 +850,7 @@ class GroupMetadataManagerTest {
     val protocolType = "consumer"
     val protocol = "range"
     val startOffset = 15L
+    val groupEpoch = 2
 
     val committedOffsets = Map(
       new TopicPartition("foo", 0) -> 23L,
@@ -771,7 +868,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, 
groupEpoch, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -788,6 +885,7 @@ class GroupMetadataManagerTest {
     val protocolType = "consumer"
     val protocol = "range"
     val startOffset = 15L
+    val groupEpoch = 2
     val tp0 = new TopicPartition("foo", 0)
     val tp1 = new TopicPartition("foo", 1)
     val tp2 = new TopicPartition("bar", 0)
@@ -814,7 +912,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(logMock, replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, _ => (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, groupEpoch, 
_ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -1982,6 +2080,7 @@ class GroupMetadataManagerTest {
     val protocolType = "consumer"
     val protocol = "range"
     val startOffset = 15L
+    val groupEpoch = 2
     val committedOffsets = Map(
       new TopicPartition("foo", 0) -> 23L,
       new TopicPartition("foo", 1) -> 455L,
@@ -1999,7 +2098,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, 
groupEpoch, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -2023,6 +2122,7 @@ class GroupMetadataManagerTest {
     val protocolType = "consumer"
     val protocol = "range"
     val startOffset = 15L
+    val groupEpoch = 2
     val committedOffsets = Map(
       new TopicPartition("foo", 0) -> 23L,
       new TopicPartition("foo", 1) -> 455L,
@@ -2039,7 +2139,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, 
groupEpoch, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -2153,6 +2253,7 @@ class GroupMetadataManagerTest {
     val groupMetadataTopicPartition = groupTopicPartition
     val startOffset = 15L
     val generation = 15
+    val groupEpoch = 2
 
     val committedOffsets = Map(
       new TopicPartition("foo", 0) -> 23L,
@@ -2190,7 +2291,7 @@ class GroupMetadataManagerTest {
     EasyMock.replay(logMock)
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> (), 0L)
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, 
groupEpoch, _ => (), 0L)
 
     // Empty control batch should not have caused the load to fail
     val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new 
AssertionError("Group was not loaded into the cache"))
@@ -2453,7 +2554,8 @@ class GroupMetadataManagerTest {
     // When passed a specific start offset, assert that the measured values 
are in excess of that.
     val now = time.milliseconds()
     val diff = 1000
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> (), now - diff)
+    val groupEpoch = 2
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, 
groupEpoch, _ => (), now - diff)
     assertTrue(partitionLoadTime("partition-load-time-max") >= diff)
     assertTrue(partitionLoadTime("partition-load-time-avg") >= diff)
   }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f059b42..6f7bb6b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1727,7 +1727,11 @@ class KafkaApisTest {
     }
 
     if (deletePartition) {
-      groupCoordinator.onResignation(groupMetadataPartition.partition)
+      if (leaderEpoch >= 0) {
+        groupCoordinator.onResignation(groupMetadataPartition.partition, 
Some(leaderEpoch))
+      } else {
+        groupCoordinator.onResignation(groupMetadataPartition.partition, None)
+      }
       EasyMock.expectLastCall()
     }
 

Reply via email to