This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new e59215a KAFKA-10614: Ensure group state (un)load is executed in the
right order (#9441)
e59215a is described below
commit e59215a006e410fb87a4b2ac61765edb031dba40
Author: Tom Bentley <[email protected]>
AuthorDate: Mon Jun 7 14:19:48 2021 +0100
KAFKA-10614: Ensure group state (un)load is executed in the right order
(#9441)
Co-authored-by: Jason Gustafson<[email protected]>
Reviewers: Jason Gustafson<[email protected]>, Guozhang Wang
<[email protected]>
---
.../kafka/coordinator/group/GroupCoordinator.scala | 12 +-
.../coordinator/group/GroupMetadataManager.scala | 112 +++++++++++-----
core/src/main/scala/kafka/server/KafkaApis.scala | 14 +-
.../scala/kafka/server/RequestHandlerHelper.scala | 4 +-
.../coordinator/group/GroupCoordinatorTest.scala | 4 +-
.../group/GroupMetadataManagerTest.scala | 146 +++++++++++++++++----
.../scala/unit/kafka/server/KafkaApisTest.scala | 6 +-
7 files changed, 228 insertions(+), 70 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 441b46d..0067549 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -906,9 +906,9 @@ class GroupCoordinator(val brokerId: Int,
*
* @param offsetTopicPartitionId The partition we are now leading
*/
- def onElection(offsetTopicPartitionId: Int): Unit = {
- info(s"Elected as the group coordinator for partition
$offsetTopicPartitionId")
- groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId,
onGroupLoaded)
+ def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
+ info(s"Elected as the group coordinator for partition
$offsetTopicPartitionId in epoch $coordinatorEpoch")
+ groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId,
coordinatorEpoch, onGroupLoaded)
}
/**
@@ -916,9 +916,9 @@ class GroupCoordinator(val brokerId: Int,
*
* @param offsetTopicPartitionId The partition we are no longer leading
*/
- def onResignation(offsetTopicPartitionId: Int): Unit = {
- info(s"Resigned as the group coordinator for partition
$offsetTopicPartitionId")
- groupManager.removeGroupsForPartition(offsetTopicPartitionId,
onGroupUnloaded)
+ def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch:
Option[Int]): Unit = {
+ info(s"Resigned as the group coordinator for partition
$offsetTopicPartitionId in epoch $coordinatorEpoch")
+ groupManager.removeGroupsForPartition(offsetTopicPartitionId,
coordinatorEpoch, onGroupUnloaded)
}
private def setAndPropagateAssignment(group: GroupMetadata, assignment:
Map[String, Array[Byte]]): Unit = {
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index fb6d07e..cb03c4c 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -21,9 +21,9 @@ import java.io.PrintStream
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.Optional
-import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import com.yammer.metrics.core.Gauge
import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1,
KAFKA_2_3_IV0}
@@ -86,6 +86,9 @@ class GroupMetadataManager(brokerId: Int,
* We use this structure to quickly find the groups which need to be updated
by the commit/abort marker. */
private val openGroupsForProducer = mutable.HashMap[Long,
mutable.Set[String]]()
+ /* Track the epoch in which we (un)loaded group state to detect racing
LeaderAndIsr requests */
+ private [group] val epochForPartitionId = new ConcurrentHashMap[Int,
java.lang.Integer]()
+
/* setup metrics*/
private val partitionLoadSensor =
metrics.sensor(GroupMetadataManager.LoadTimeSensor)
@@ -526,34 +529,42 @@ class GroupMetadataManager(brokerId: Int,
/**
* Asynchronously read the partition from the offsets topic and populate the
cache
*/
- def scheduleLoadGroupAndOffsets(offsetsPartition: Int, onGroupLoaded:
GroupMetadata => Unit): Unit = {
+ def scheduleLoadGroupAndOffsets(offsetsPartition: Int, coordinatorEpoch:
Int, onGroupLoaded: GroupMetadata => Unit): Unit = {
val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
offsetsPartition)
- if (addLoadingPartition(offsetsPartition)) {
- info(s"Scheduling loading of offsets and group metadata from
$topicPartition")
- val startTimeMs = time.milliseconds()
- scheduler.schedule(topicPartition.toString, () =>
loadGroupsAndOffsets(topicPartition, onGroupLoaded, startTimeMs))
- } else {
- info(s"Already loading offsets and group metadata from $topicPartition")
- }
+ info(s"Scheduling loading of offsets and group metadata from
$topicPartition for epoch $coordinatorEpoch")
+ val startTimeMs = time.milliseconds()
+ scheduler.schedule(topicPartition.toString, () =>
loadGroupsAndOffsets(topicPartition, coordinatorEpoch, onGroupLoaded,
startTimeMs))
}
- private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition,
onGroupLoaded: GroupMetadata => Unit, startTimeMs: java.lang.Long): Unit = {
- try {
- val schedulerTimeMs = time.milliseconds() - startTimeMs
- debug(s"Started loading offsets and group metadata from $topicPartition")
- doLoadGroupsAndOffsets(topicPartition, onGroupLoaded)
- val endTimeMs = time.milliseconds()
- val totalLoadingTimeMs = endTimeMs - startTimeMs
- partitionLoadSensor.record(totalLoadingTimeMs.toDouble, endTimeMs, false)
- info(s"Finished loading offsets and group metadata from $topicPartition "
- + s"in $totalLoadingTimeMs milliseconds, of which $schedulerTimeMs
milliseconds"
- + s" was spent in the scheduler.")
- } catch {
- case t: Throwable => error(s"Error loading offsets from
$topicPartition", t)
- } finally {
- inLock(partitionLock) {
- ownedPartitions.add(topicPartition.partition)
- loadingPartitions.remove(topicPartition.partition)
+ private[group] def loadGroupsAndOffsets(
+ topicPartition: TopicPartition,
+ coordinatorEpoch: Int,
+ onGroupLoaded: GroupMetadata => Unit,
+ startTimeMs: java.lang.Long
+ ): Unit = {
+ if (!maybeUpdateCoordinatorEpoch(topicPartition.partition,
Some(coordinatorEpoch))) {
+ info(s"Not loading offsets and group metadata for $topicPartition " +
+ s"in epoch $coordinatorEpoch since current epoch is
${epochForPartitionId.get(topicPartition.partition)}")
+ } else if (!addLoadingPartition(topicPartition.partition)) {
+ info(s"Already loading offsets and group metadata from $topicPartition")
+ } else {
+ try {
+ val schedulerTimeMs = time.milliseconds() - startTimeMs
+ debug(s"Started loading offsets and group metadata from
$topicPartition for epoch $coordinatorEpoch")
+ doLoadGroupsAndOffsets(topicPartition, onGroupLoaded)
+ val endTimeMs = time.milliseconds()
+ val totalLoadingTimeMs = endTimeMs - startTimeMs
+ partitionLoadSensor.record(totalLoadingTimeMs.toDouble, endTimeMs,
false)
+ info(s"Finished loading offsets and group metadata from
$topicPartition "
+ + s"in $totalLoadingTimeMs milliseconds for epoch $coordinatorEpoch,
of which " +
+ s"$schedulerTimeMs milliseconds was spent in the scheduler.")
+ } catch {
+ case t: Throwable => error(s"Error loading offsets from
$topicPartition", t)
+ } finally {
+ inLock(partitionLock) {
+ ownedPartitions.add(topicPartition.partition)
+ loadingPartitions.remove(topicPartition.partition)
+ }
}
}
}
@@ -747,20 +758,28 @@ class GroupMetadataManager(brokerId: Int,
* @param offsetsPartition Groups belonging to this partition of the offsets
topic will be deleted from the cache.
*/
def removeGroupsForPartition(offsetsPartition: Int,
+ coordinatorEpoch: Option[Int],
onGroupUnloaded: GroupMetadata => Unit): Unit =
{
val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
offsetsPartition)
info(s"Scheduling unloading of offsets and group metadata from
$topicPartition")
- scheduler.schedule(topicPartition.toString, () => removeGroupsAndOffsets())
+ scheduler.schedule(topicPartition.toString, () =>
removeGroupsAndOffsets(topicPartition, coordinatorEpoch, onGroupUnloaded))
+ }
- def removeGroupsAndOffsets(): Unit = {
+ private [group] def removeGroupsAndOffsets(topicPartition: TopicPartition,
+ coordinatorEpoch: Option[Int],
+ onGroupUnloaded: GroupMetadata =>
Unit): Unit = {
+ val offsetsPartition = topicPartition.partition
+ if (maybeUpdateCoordinatorEpoch(offsetsPartition, coordinatorEpoch)) {
var numOffsetsRemoved = 0
var numGroupsRemoved = 0
- debug(s"Started unloading offsets and group metadata for
$topicPartition")
+ debug(s"Started unloading offsets and group metadata for $topicPartition
for " +
+ s"coordinator epoch $coordinatorEpoch")
inLock(partitionLock) {
// we need to guard the group removal in cache in the loading
partition lock
// to prevent coordinator's check-and-get-group race condition
ownedPartitions.remove(offsetsPartition)
+ loadingPartitions.remove(offsetsPartition)
for (group <- groupMetadataCache.values) {
if (partitionFor(group.groupId) == offsetsPartition) {
@@ -772,12 +791,35 @@ class GroupMetadataManager(brokerId: Int,
}
}
}
-
- info(s"Finished unloading $topicPartition. Removed $numOffsetsRemoved
cached offsets " +
- s"and $numGroupsRemoved cached groups.")
+ info(s"Finished unloading $topicPartition for coordinator epoch
$coordinatorEpoch. " +
+ s"Removed $numOffsetsRemoved cached offsets and $numGroupsRemoved
cached groups.")
+ } else {
+ info(s"Not removing offsets and group metadata for $topicPartition " +
+ s"in epoch $coordinatorEpoch since current epoch is
${epochForPartitionId.get(topicPartition.partition)}")
}
}
+ /**
+ * Update the cached coordinator epoch if the new value is larger than the
old value.
+ * @return true if `epochOpt` is either empty or contains a value greater
than or equal to the current epoch
+ */
+ private def maybeUpdateCoordinatorEpoch(
+ partitionId: Int,
+ epochOpt: Option[Int]
+ ): Boolean = {
+ val updatedEpoch = epochForPartitionId.compute(partitionId, (_,
currentEpoch) => {
+ if (currentEpoch == null) {
+ epochOpt.map(Int.box).orNull
+ } else {
+ epochOpt match {
+ case Some(epoch) if epoch > currentEpoch => epoch
+ case _ => currentEpoch
+ }
+ }
+ })
+ epochOpt.forall(_ == updatedEpoch)
+ }
+
// visible for testing
private[group] def cleanupGroupMetadata(): Unit = {
val currentTimestamp = time.milliseconds()
@@ -961,7 +1003,11 @@ class GroupMetadataManager(brokerId: Int,
*/
private[group] def addLoadingPartition(partition: Int): Boolean = {
inLock(partitionLock) {
- loadingPartitions.add(partition)
+ if (ownedPartitions.contains(partition)) {
+ false
+ } else {
+ loadingPartitions.add(partition)
+ }
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 124c5a9..088444d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -286,14 +286,18 @@ class KafkaApis(val requestChannel: RequestChannel,
// cannot rely on the LeaderAndIsr API for this since it is only sent to
active replicas.
result.forKeyValue { (topicPartition, error) =>
if (error == Errors.NONE) {
+ val partitionState = partitionStates(topicPartition)
if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
- && partitionStates(topicPartition).deletePartition) {
- groupCoordinator.onResignation(topicPartition.partition)
+ && partitionState.deletePartition) {
+ val leaderEpoch = if (partitionState.leaderEpoch >= 0)
+ Some(partitionState.leaderEpoch)
+ else
+ None
+ groupCoordinator.onResignation(topicPartition.partition,
leaderEpoch)
} else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
- && partitionStates(topicPartition).deletePartition) {
- val partitionState = partitionStates(topicPartition)
+ && partitionState.deletePartition) {
val leaderEpoch = if (partitionState.leaderEpoch >= 0)
- Some(partitionState.leaderEpoch)
+ Some(partitionState.leaderEpoch)
else
None
txnCoordinator.onResignation(topicPartition.partition,
coordinatorEpoch = leaderEpoch)
diff --git a/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
b/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
index cf2b816..804e888 100644
--- a/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
+++ b/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
@@ -44,14 +44,14 @@ object RequestHandlerHelper {
// leadership changes
updatedLeaders.foreach { partition =>
if (partition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
- groupCoordinator.onElection(partition.partitionId)
+ groupCoordinator.onElection(partition.partitionId,
partition.getLeaderEpoch)
else if (partition.topic == Topic.TRANSACTION_STATE_TOPIC_NAME)
txnCoordinator.onElection(partition.partitionId,
partition.getLeaderEpoch)
}
updatedFollowers.foreach { partition =>
if (partition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
- groupCoordinator.onResignation(partition.partitionId)
+ groupCoordinator.onResignation(partition.partitionId,
Some(partition.getLeaderEpoch))
else if (partition.topic == Topic.TRANSACTION_STATE_TOPIC_NAME)
txnCoordinator.onResignation(partition.partitionId,
Some(partition.getLeaderEpoch))
}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 3e1f820..51aa9ed 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -189,7 +189,9 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.getLog(otherGroupMetadataTopicPartition)).andReturn(None)
EasyMock.replay(replicaManager)
-
groupCoordinator.groupManager.loadGroupsAndOffsets(otherGroupMetadataTopicPartition,
group => {}, 0L)
+ // Call removeGroupsAndOffsets so that partition removed from
loadingPartitions
+
groupCoordinator.groupManager.removeGroupsAndOffsets(otherGroupMetadataTopicPartition,
Some(1), group => {})
+
groupCoordinator.groupManager.loadGroupsAndOffsets(otherGroupMetadataTopicPartition,
1, group => {}, 0L)
assertEquals(Errors.NONE,
groupCoordinator.handleDescribeGroup(otherGroupId)._1)
}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 45eccbe..60f0822 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -128,6 +128,7 @@ class GroupMetadataManagerTest {
def testLoadOffsetsWithoutGroup(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val startOffset = 15L
+ val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
@@ -141,7 +142,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition,
groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
@@ -158,6 +159,7 @@ class GroupMetadataManagerTest {
val generation = 15
val protocolType = "consumer"
val startOffset = 15L
+ val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
@@ -173,7 +175,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition,
groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
@@ -192,6 +194,7 @@ class GroupMetadataManagerTest {
val groupMetadataTopicPartition = groupTopicPartition
val producerId = 1000L
val producerEpoch: Short = 2
+ val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
@@ -210,7 +213,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition,
groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
@@ -226,6 +229,7 @@ class GroupMetadataManagerTest {
val groupMetadataTopicPartition = groupTopicPartition
val producerId = 1000L
val producerEpoch: Short = 2
+ val groupEpoch = 2
val abortedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
@@ -244,7 +248,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition,
groupEpoch, _ => (), 0L)
// Since there are no committed offsets for the group, and there is no
other group metadata, we don't expect the
// group to be loaded.
@@ -256,6 +260,7 @@ class GroupMetadataManagerTest {
val groupMetadataTopicPartition = groupTopicPartition
val producerId = 1000L
val producerEpoch: Short = 2
+ val groupEpoch = 2
val foo0 = new TopicPartition("foo", 0)
val foo1 = new TopicPartition("foo", 1)
@@ -276,7 +281,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition,
groupEpoch, _ => (), 0L)
// The group should be loaded with pending offsets.
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
@@ -297,6 +302,7 @@ class GroupMetadataManagerTest {
val groupMetadataTopicPartition = groupTopicPartition
val producerId = 1000L
val producerEpoch: Short = 2
+ val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
@@ -323,7 +329,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition,
groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
@@ -342,6 +348,7 @@ class GroupMetadataManagerTest {
val groupMetadataTopicPartition = groupTopicPartition
val producerId = 1000L
val producerEpoch: Short = 2
+ val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
@@ -378,7 +385,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition,
groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
@@ -411,6 +418,7 @@ class GroupMetadataManagerTest {
val firstProducerEpoch: Short = 2
val secondProducerId = 1001L
val secondProducerEpoch: Short = 3
+ val groupEpoch = 2
val committedOffsetsFirstProducer = Map(
new TopicPartition("foo", 0) -> 23L,
@@ -441,7 +449,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition,
groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
@@ -465,6 +473,7 @@ class GroupMetadataManagerTest {
val groupMetadataTopicPartition = groupTopicPartition
val producerId = 1000L
val producerEpoch: Short = 2
+ val groupEpoch = 2
val transactionalOffsetCommits = Map(
new TopicPartition("foo", 0) -> 23L
@@ -487,7 +496,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition,
groupEpoch, _ => (), 0L)
// The group should be loaded with pending offsets.
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
@@ -508,6 +517,7 @@ class GroupMetadataManagerTest {
val groupMetadataTopicPartition = groupTopicPartition
val producerId = 1000L
val producerEpoch: Short = 2
+ val groupEpoch = 2
val transactionalOffsetCommits = Map(
new TopicPartition("foo", 0) -> 23L
@@ -529,7 +539,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition,
groupEpoch, _ => (), 0L)
// The group should be loaded with pending offsets.
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
@@ -596,6 +606,7 @@ class GroupMetadataManagerTest {
def testLoadOffsetsWithTombstones(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val startOffset = 15L
+ val groupEpoch = 2
val tombstonePartition = new TopicPartition("foo", 1)
val committedOffsets = Map(
@@ -613,7 +624,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition,
groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
@@ -629,7 +640,10 @@ class GroupMetadataManagerTest {
@Test
def testLoadOffsetsAndGroup(): Unit = {
- val groupMetadataTopicPartition = groupTopicPartition
+ loadOffsetsAndGroup(groupTopicPartition, 2)
+ }
+
+ def loadOffsetsAndGroup(groupMetadataTopicPartition: TopicPartition,
groupEpoch: Int): GroupMetadata = {
val generation = 935
val protocolType = "consumer"
val protocol = "range"
@@ -650,7 +664,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition,
groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
@@ -665,12 +679,92 @@ class GroupMetadataManagerTest {
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
assertTrue(group.offset(topicPartition).map(_.expireTimestamp).contains(None))
}
+ group
+ }
+
+ @Test
+ def testLoadOffsetsAndGroupIgnored(): Unit = {
+ val groupEpoch = 2
+ loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
+ assertEquals(groupEpoch,
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
+
+ groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition,
Some(groupEpoch), _ => ())
+ assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
+ "Removed group remained in cache")
+ assertEquals(groupEpoch,
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
+
+ groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, groupEpoch
- 1, _ => (), 0L)
+ assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
+ "Removed group remained in cache")
+ assertEquals(groupEpoch,
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
+ }
+
+ @Test
+ def testUnloadOffsetsAndGroup(): Unit = {
+ val groupEpoch = 2
+ loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
+
+ groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition,
Some(groupEpoch), _ => ())
+ assertEquals(groupEpoch,
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
+ assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
+ "Removed group remained in cache")
+ }
+
+ @Test
+ def testUnloadOffsetsAndGroupIgnored(): Unit = {
+ val groupEpoch = 2
+ val initiallyLoaded = loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
+
+ groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition,
Some(groupEpoch - 1), _ => ())
+ assertEquals(groupEpoch,
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
+ val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
+ assertEquals(initiallyLoaded.groupId, group.groupId)
+ assertEquals(initiallyLoaded.currentState, group.currentState)
+ assertEquals(initiallyLoaded.leaderOrNull, group.leaderOrNull)
+ assertEquals(initiallyLoaded.generationId, group.generationId)
+ assertEquals(initiallyLoaded.protocolType, group.protocolType)
+ assertEquals(initiallyLoaded.protocolName.orNull,
group.protocolName.orNull)
+ assertEquals(initiallyLoaded.allMembers, group.allMembers)
+ assertEquals(initiallyLoaded.allOffsets.size, group.allOffsets.size)
+ initiallyLoaded.allOffsets.foreach { case (topicPartition, offset) =>
+ assertEquals(Some(offset), group.offset(topicPartition))
+
assertTrue(group.offset(topicPartition).map(_.expireTimestamp).contains(None))
+ }
+ }
+
+ @Test
+ def testUnloadOffsetsAndGroupIgnoredAfterStopReplica(): Unit = {
+ val groupEpoch = 2
+ val initiallyLoaded = loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
+
+ groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, None, _
=> ())
+ assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
+ "Removed group remained in cache")
+ assertEquals(groupEpoch,
groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()),
+ "Replica which was stopped still in epochForPartitionId")
+
+ EasyMock.reset(replicaManager)
+ loadOffsetsAndGroup(groupTopicPartition, groupEpoch + 1)
+ val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
+ assertEquals(initiallyLoaded.groupId, group.groupId)
+ assertEquals(initiallyLoaded.currentState, group.currentState)
+ assertEquals(initiallyLoaded.leaderOrNull, group.leaderOrNull)
+ assertEquals(initiallyLoaded.generationId, group.generationId)
+ assertEquals(initiallyLoaded.protocolType, group.protocolType)
+ assertEquals(initiallyLoaded.protocolName.orNull,
group.protocolName.orNull)
+ assertEquals(initiallyLoaded.allMembers, group.allMembers)
+ assertEquals(initiallyLoaded.allOffsets.size, group.allOffsets.size)
+ initiallyLoaded.allOffsets.foreach { case (topicPartition, offset) =>
+ assertEquals(Some(offset), group.offset(topicPartition))
+
assertTrue(group.offset(topicPartition).map(_.expireTimestamp).contains(None))
+ }
}
@Test
def testLoadGroupWithTombstone(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val startOffset = 15L
+ val groupEpoch = 2
val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(generation = 15,
protocolType = "consumer", protocol = "range", memberId)
@@ -682,7 +776,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition,
groupEpoch, _ => (), 0L)
assertEquals(None, groupMetadataManager.getGroup(groupId))
}
@@ -691,6 +785,7 @@ class GroupMetadataManagerTest {
def testLoadGroupWithLargeGroupMetadataRecord(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val startOffset = 15L
+ val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
@@ -711,7 +806,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition,
groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
committedOffsets.foreach { case (topicPartition, offset) =>
@@ -726,6 +821,7 @@ class GroupMetadataManagerTest {
// is accidentally corrupted.
val startOffset = 0L
val endOffset = 10L
+ val groupEpoch = 2
val logMock: Log = EasyMock.mock(classOf[Log])
EasyMock.expect(replicaManager.getLog(groupTopicPartition)).andStubReturn(Some(logMock))
@@ -735,7 +831,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, _ => (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, groupEpoch,
_ => (), 0L)
EasyMock.verify(logMock)
EasyMock.verify(replicaManager)
@@ -754,6 +850,7 @@ class GroupMetadataManagerTest {
val protocolType = "consumer"
val protocol = "range"
val startOffset = 15L
+ val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
@@ -771,7 +868,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition,
groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
@@ -788,6 +885,7 @@ class GroupMetadataManagerTest {
val protocolType = "consumer"
val protocol = "range"
val startOffset = 15L
+ val groupEpoch = 2
val tp0 = new TopicPartition("foo", 0)
val tp1 = new TopicPartition("foo", 1)
val tp2 = new TopicPartition("bar", 0)
@@ -814,7 +912,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(logMock, replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, _ => (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, groupEpoch,
_ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
@@ -1982,6 +2080,7 @@ class GroupMetadataManagerTest {
val protocolType = "consumer"
val protocol = "range"
val startOffset = 15L
+ val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
@@ -1999,7 +2098,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition,
groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
@@ -2023,6 +2122,7 @@ class GroupMetadataManagerTest {
val protocolType = "consumer"
val protocol = "range"
val startOffset = 15L
+ val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
@@ -2039,7 +2139,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition,
groupEpoch, _ => (), 0L)
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
@@ -2153,6 +2253,7 @@ class GroupMetadataManagerTest {
val groupMetadataTopicPartition = groupTopicPartition
val startOffset = 15L
val generation = 15
+ val groupEpoch = 2
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
@@ -2190,7 +2291,7 @@ class GroupMetadataManagerTest {
EasyMock.replay(logMock)
EasyMock.replay(replicaManager)
- groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> (), 0L)
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition,
groupEpoch, _ => (), 0L)
// Empty control batch should not have caused the load to fail
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new
AssertionError("Group was not loaded into the cache"))
@@ -2453,7 +2554,8 @@ class GroupMetadataManagerTest {
// When passed a specific start offset, assert that the measured values
are in excess of that.
val now = time.milliseconds()
val diff = 1000
- groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> (), now - diff)
+ val groupEpoch = 2
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition,
groupEpoch, _ => (), now - diff)
assertTrue(partitionLoadTime("partition-load-time-max") >= diff)
assertTrue(partitionLoadTime("partition-load-time-avg") >= diff)
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f059b42..6f7bb6b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1727,7 +1727,11 @@ class KafkaApisTest {
}
if (deletePartition) {
- groupCoordinator.onResignation(groupMetadataPartition.partition)
+ if (leaderEpoch >= 0) {
+ groupCoordinator.onResignation(groupMetadataPartition.partition,
Some(leaderEpoch))
+ } else {
+ groupCoordinator.onResignation(groupMetadataPartition.partition, None)
+ }
EasyMock.expectLastCall()
}