This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 31f79055ceb KAFKA-17306; Soften the validation when replaying 
tombstones (#16898)
31f79055ceb is described below

commit 31f79055cebcc88d866dcd8eb331fd8b85557be9
Author: David Jacot <dja...@confluent.io>
AuthorDate: Tue Sep 10 16:28:36 2024 +0200

    KAFKA-17306; Soften the validation when replaying tombstones (#16898)
    
    This patch fixes a few buts in the replay logic of the consumer group 
records:
    * The first issue is that the logic assumed that the group or the member 
exists when tombstones are replayed. Obviously, this is incorrect after a 
restart. The group or the member may not me there anymore if the 
__consumer_offsets partitions only contains tombstones for the group or the 
member. The patch fixes this by considering tombstones as no-ops if the entity 
does not exist.
    * The second issue is that the logic assumed that consumer group records 
are always in a specific order in the log so the logic was only accepting to 
create a consumer group when `ConsumerGroupMemberMetadata` record is replayed. 
This is obviously incorrect too. During the life time of a consumer group, the 
records may be in different order. The patch fixes this by allowing the 
creating of a consumer group by any record.
    * The third issue is that it is possible to replay offset commit records 
for a specific consumer group before the consumer group is actually created 
while replying its records. By default the OffsetMetadataManager creates a 
simple classic group to hold those offset commits. When the consumer offset 
records are finally replayed, the logic will fail because a classic group 
already exists. The patch fixes this by converting a simple classic group when 
records for a consumer group are replayed.
    
    All those combinations are hard to test with unit tests. This patch adds an 
integration tests which reproduces some of those interleaving of records. I 
used them to reproduce the issues describe above.
    
    Reviewers: TengYao Chi <kiting...@gmail.com>, Jeff Kim 
<jeff....@confluent.io>, Justine Olshan <jols...@confluent.io>, Chia-Ping Tsai 
<chia7...@gmail.com>
---
 .../coordinator/group/CoordinatorLoaderImpl.scala  |  55 +++-
 .../api/GroupCoordinatorIntegrationTest.scala      | 339 ++++++++++++++++++---
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   4 +-
 .../coordinator/group/GroupMetadataManager.java    | 122 +++++---
 .../coordinator/group/classic/ClassicGroup.java    |   7 +
 .../coordinator/group/modern/ModernGroup.java      |   4 +-
 .../group/modern/consumer/ConsumerGroup.java       |   3 +-
 .../coordinator/group/modern/share/ShareGroup.java |   3 +-
 .../group/GroupMetadataManagerTest.java            | 169 +++++++++-
 9 files changed, 618 insertions(+), 88 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala 
b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
index d7033931cd0..3067e98fbe2 100644
--- a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
+++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
@@ -140,12 +140,20 @@ class CoordinatorLoaderImpl[T](
                 batch.asScala.foreach { record =>
                   val controlRecord = ControlRecordType.parse(record.key)
                   if (controlRecord == ControlRecordType.COMMIT) {
+                    if (isTraceEnabled) {
+                      trace(s"Replaying end transaction marker from $tp at 
offset ${record.offset} to commit transaction " +
+                        s"with producer id ${batch.producerId} and producer 
epoch ${batch.producerEpoch}.")
+                    }
                     coordinator.replayEndTransactionMarker(
                       batch.producerId,
                       batch.producerEpoch,
                       TransactionResult.COMMIT
                     )
                   } else if (controlRecord == ControlRecordType.ABORT) {
+                    if (isTraceEnabled) {
+                      trace(s"Replaying end transaction marker from $tp at 
offset ${record.offset} to abort transaction " +
+                        s"with producer id ${batch.producerId} and producer 
epoch ${batch.producerEpoch}.")
+                    }
                     coordinator.replayEndTransactionMarker(
                       batch.producerId,
                       batch.producerEpoch,
@@ -156,17 +164,42 @@ class CoordinatorLoaderImpl[T](
               } else {
                 batch.asScala.foreach { record =>
                   numRecords = numRecords + 1
-                  try {
-                    coordinator.replay(
-                      record.offset(),
-                      batch.producerId,
-                      batch.producerEpoch,
-                      deserializer.deserialize(record.key, record.value)
-                    )
-                  } catch {
-                    case ex: UnknownRecordTypeException =>
-                      warn(s"Unknown record type ${ex.unknownType} while 
loading offsets and group metadata " +
-                        s"from $tp. Ignoring it. It could be a left over from 
an aborted upgrade.")
+
+                  val coordinatorRecordOpt = {
+                    try {
+                      Some(deserializer.deserialize(record.key, record.value))
+                    } catch {
+                      case ex: UnknownRecordTypeException =>
+                        warn(s"Unknown record type ${ex.unknownType} while 
loading offsets and group metadata " +
+                          s"from $tp. Ignoring it. It could be a left over 
from an aborted upgrade.")
+                        None
+                      case ex: RuntimeException =>
+                        val msg = s"Deserializing record $record from $tp 
failed due to: ${ex.getMessage}"
+                        error(s"$msg.")
+                        throw new RuntimeException(msg, ex)
+                    }
+                  }
+
+                  coordinatorRecordOpt.foreach { coordinatorRecord =>
+                    try {
+                      if (isTraceEnabled) {
+                        trace(s"Replaying record $coordinatorRecord from $tp 
at offset ${record.offset()} " +
+                          s"with producer id ${batch.producerId} and producer 
epoch ${batch.producerEpoch}.")
+                      }
+                      coordinator.replay(
+                        record.offset(),
+                        batch.producerId,
+                        batch.producerEpoch,
+                        coordinatorRecord
+                      )
+                    } catch {
+                      case ex: RuntimeException =>
+                        val msg = s"Replaying record $coordinatorRecord from 
$tp at offset ${record.offset()} " +
+                          s"with producer id ${batch.producerId} and producer 
epoch ${batch.producerEpoch} " +
+                          s"failed due to: ${ex.getMessage}"
+                        error(s"$msg.")
+                        throw new RuntimeException(msg, ex)
+                    }
                   }
                 }
               }
diff --git 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index 2b92750883e..a666a740a14 100644
--- 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -12,53 +12,320 @@
  */
 package kafka.api
 
-import kafka.integration.KafkaServerTestHarness
 import kafka.log.UnifiedLog
-import kafka.server.KafkaConfig
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
+import kafka.test.junit.ClusterTestExtensions
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.OffsetAndMetadata
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription}
+import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol, 
OffsetAndMetadata}
+import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture, 
TopicPartition}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
 
 import scala.jdk.CollectionConverters._
-import java.util.Properties
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record.CompressionType
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.server.config.ServerConfigs
+import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.extension.ExtendWith
 
-class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
-  val offsetsTopicCompressionCodec = CompressionType.GZIP
-  val overridingProps = new Properties()
-  overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 
"1")
-  
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG,
 offsetsTopicCompressionCodec.id.toString)
+import java.time.Duration
+import java.util.Collections
+import java.util.concurrent.TimeUnit
 
-  override def generateConfigs = TestUtils.createBrokerConfigs(1, 
zkConnectOrNull, enableControlledShutdown = false).map {
-    KafkaConfig.fromProps(_, overridingProps)
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
+
+  @ClusterTest(
+    types = Array(Type.KRAFT, Type.ZK),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "false"),
+    )
+  )
+  def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(): Unit = {
+    withConsumer(groupId = "group", groupProtocol = GroupProtocol.CLASSIC) { 
consumer =>
+      consumer.commitSync(Map(
+        new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new 
OffsetAndMetadata(10, "")
+      ).asJava)
+
+      val logManager = cluster.brokers().asScala.head._2.logManager
+      def getGroupMetadataLogOpt: Option[UnifiedLog] =
+        logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
0))
+
+      TestUtils.waitUntilTrue(() => 
getGroupMetadataLogOpt.exists(_.logSegments.asScala.exists(_.log.batches.asScala.nonEmpty)),
+        "Commit message not appended in time")
+
+      val logSegments = getGroupMetadataLogOpt.get.logSegments.asScala
+      val incorrectCompressionCodecs = logSegments
+        .flatMap(_.log.batches.asScala.map(_.compressionType))
+        .filter(_ != CompressionType.GZIP)
+
+      assertEquals(Seq.empty, incorrectCompressionCodecs, "Incorrect 
compression codecs should be empty")
+    }
+  }
+
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    )
+  )
+  def 
testCoordinatorFailoverAfterCompactingPartitionWithConsumerGroupMemberJoiningAndLeaving():
 Unit = {
+    withAdmin { admin =>
+      TestUtils.createTopicWithAdminRaw(
+        admin = admin,
+        topic = "foo",
+        numPartitions = 3
+      )
+
+      // Create a consumer group grp1 with one member. The member subscribes 
to foo and leaves. This creates
+      // a mix of group records with tombstones to delete the member.
+      withConsumer(groupId = "grp1", groupProtocol = GroupProtocol.CONSUMER) { 
consumer =>
+        consumer.subscribe(List("foo").asJava)
+        TestUtils.waitUntilTrue(() => {
+          consumer.poll(Duration.ofMillis(50))
+          consumer.assignment.asScala.nonEmpty
+        }, msg = "Consumer did not get an non empty assignment")
+      }
+    }
+
+    // Force a compaction.
+    rollAndCompactConsumerOffsets()
+
+    // Restart the broker to reload the group coordinator.
+    cluster.shutdownBroker(0)
+    cluster.startBroker(0)
+
+    // Verify the state of the groups to ensure that the group coordinator
+    // was correctly loaded. If replaying any of the records fails, the
+    // group coordinator won't be available.
+    withAdmin { admin =>
+      val groups = admin
+        .describeConsumerGroups(List("grp1").asJava)
+        .describedGroups()
+        .asScala
+        .toMap
+
+      assertDescribedGroup(groups, "grp1", GroupType.CONSUMER, 
ConsumerGroupState.EMPTY)
+    }
+  }
+
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    )
+  )
+  def 
testCoordinatorFailoverCompactingPartitionWithManualOffsetCommitsAndConsumerGroupMemberUnsubscribingAndResubscribing():
 Unit = {
+    withAdmin { admin =>
+      TestUtils.createTopicWithAdminRaw(
+        admin = admin,
+        topic = "foo",
+        numPartitions = 3
+      )
+
+      // Create a consumer group grp2 with one member. The member subscribes 
to foo, manually commits offsets,
+      // unsubscribes and finally re-subscribes to foo. This creates a mix of 
group records with tombstones
+      // and ensure that all the offset commit records are before the consumer 
group records due to the
+      // rebalance after the commit sync.
+      withConsumer(groupId = "grp2", groupProtocol = GroupProtocol.CONSUMER, 
enableAutoCommit = false) { consumer =>
+        consumer.subscribe(List("foo").asJava)
+        TestUtils.waitUntilTrue(() => {
+          consumer.poll(Duration.ofMillis(50))
+          consumer.assignment().asScala.nonEmpty
+        }, msg = "Consumer did not get an non empty assignment")
+        consumer.commitSync()
+        consumer.unsubscribe()
+        consumer.subscribe(List("foo").asJava)
+        TestUtils.waitUntilTrue(() => {
+          consumer.poll(Duration.ofMillis(50))
+          consumer.assignment().asScala.nonEmpty
+        }, msg = "Consumer did not get an non empty assignment")
+      }
+    }
+
+    // Force a compaction.
+    rollAndCompactConsumerOffsets()
+
+    // Restart the broker to reload the group coordinator.
+    cluster.shutdownBroker(0)
+    cluster.startBroker(0)
+
+    // Verify the state of the groups to ensure that the group coordinator
+    // was correctly loaded. If replaying any of the records fails, the
+    // group coordinator won't be available.
+    withAdmin { admin =>
+      val groups = admin
+        .describeConsumerGroups(List("grp2").asJava)
+        .describedGroups()
+        .asScala
+        .toMap
+
+      assertDescribedGroup(groups, "grp2", GroupType.CONSUMER, 
ConsumerGroupState.EMPTY)
+    }
+  }
+
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    )
+  )
+  def 
testCoordinatorFailoverAfterCompactingPartitionWithConsumerGroupDeleted(): Unit 
= {
+    withAdmin { admin =>
+      TestUtils.createTopicWithAdminRaw(
+        admin = admin,
+        topic = "foo",
+        numPartitions = 3
+      )
+
+      // Create a consumer group grp3 with one member. The member subscribes 
to foo and leaves the group. Then
+      // the group is deleted. This creates tombstones to delete the member, 
the group and the offsets.
+      withConsumer(groupId = "grp3", groupProtocol = GroupProtocol.CONSUMER) { 
consumer =>
+        consumer.subscribe(List("foo").asJava)
+        TestUtils.waitUntilTrue(() => {
+          consumer.poll(Duration.ofMillis(50))
+          consumer.assignment().asScala.nonEmpty
+        }, msg = "Consumer did not get an non empty assignment")
+      }
+
+      admin
+        .deleteConsumerGroups(List("grp3").asJava)
+        .deletedGroups()
+        .get("grp3")
+        .get(10, TimeUnit.SECONDS)
+    }
+
+    // Force a compaction.
+    rollAndCompactConsumerOffsets()
+
+    // Restart the broker to reload the group coordinator.
+    cluster.shutdownBroker(0)
+    cluster.startBroker(0)
+
+    // Verify the state of the groups to ensure that the group coordinator
+    // was correctly loaded. If replaying any of the records fails, the
+    // group coordinator won't be available.
+    withAdmin { admin =>
+      val groups = admin
+        .describeConsumerGroups(List("grp3").asJava)
+        .describedGroups()
+        .asScala
+        .toMap
+
+      assertDescribedGroup(groups, "grp3", GroupType.CLASSIC, 
ConsumerGroupState.DEAD)
+    }
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(quorum: 
String): Unit = {
-    val consumer = TestUtils.createConsumer(bootstrapServers())
-    val offsetMap = Map(
-      new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new 
OffsetAndMetadata(10, "")
-    ).asJava
-    consumer.commitSync(offsetMap)
-    val logManager = brokers.head.logManager
-    def getGroupMetadataLogOpt: Option[UnifiedLog] =
-      logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0))
-
-    TestUtils.waitUntilTrue(() => 
getGroupMetadataLogOpt.exists(_.logSegments.asScala.exists(_.log.batches.asScala.nonEmpty)),
-                            "Commit message not appended in time")
-
-    val logSegments = getGroupMetadataLogOpt.get.logSegments.asScala
-    val incorrectCompressionCodecs = logSegments
-      .flatMap(_.log.batches.asScala.map(_.compressionType))
-      .filter(_ != offsetsTopicCompressionCodec)
-    assertEquals(Seq.empty, incorrectCompressionCodecs, "Incorrect compression 
codecs should be empty")
-
-    consumer.close()
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    )
+  )
+  def 
testCoordinatorFailoverAfterCompactingPartitionWithUpgradedConsumerGroup(): 
Unit = {
+    withAdmin { admin =>
+      TestUtils.createTopicWithAdminRaw(
+        admin = admin,
+        topic = "foo",
+        numPartitions = 3
+      )
+
+      // Create a classic group grp4 with one member. Upgrades the group to 
the consumer
+      // protocol.
+      withConsumer(groupId = "grp4", groupProtocol = GroupProtocol.CLASSIC) { 
consumer =>
+        consumer.subscribe(List("foo").asJava)
+        TestUtils.waitUntilTrue(() => {
+          consumer.poll(Duration.ofMillis(50))
+          consumer.assignment().asScala.nonEmpty
+        }, msg = "Consumer did not get an non empty assignment")
+      }
+
+      withConsumer(groupId = "grp4", groupProtocol = GroupProtocol.CONSUMER) { 
consumer =>
+        consumer.subscribe(List("foo").asJava)
+        TestUtils.waitUntilTrue(() => {
+          consumer.poll(Duration.ofMillis(50))
+          consumer.assignment().asScala.nonEmpty
+        }, msg = "Consumer did not get an non empty assignment")
+      }
+    }
+
+    // Force a compaction.
+    rollAndCompactConsumerOffsets()
+
+    // Restart the broker to reload the group coordinator.
+    cluster.shutdownBroker(0)
+    cluster.startBroker(0)
+
+    // Verify the state of the groups to ensure that the group coordinator
+    // was correctly loaded. If replaying any of the records fails, the
+    // group coordinator won't be available.
+    withAdmin { admin =>
+      val groups = admin
+        .describeConsumerGroups(List("grp4").asJava)
+        .describedGroups()
+        .asScala
+        .toMap
+
+      assertDescribedGroup(groups, "grp4", GroupType.CONSUMER, 
ConsumerGroupState.EMPTY)
+    }
+  }
+
+  private def rollAndCompactConsumerOffsets(): Unit = {
+    val tp = new TopicPartition("__consumer_offsets", 0)
+    val broker = cluster.brokers.asScala.head._2
+    val log = broker.logManager.getLog(tp).get
+    log.roll()
+    assertTrue(broker.logManager.cleaner.awaitCleaned(tp, 0))
+  }
+
+  private def withAdmin(f: Admin => Unit): Unit = {
+    val admin: Admin = cluster.createAdminClient()
+    try {
+      f(admin)
+    } finally {
+      admin.close()
+    }
+  }
+
+  private def withConsumer(
+    groupId: String,
+    groupProtocol: GroupProtocol,
+    enableAutoCommit: Boolean = true
+  )(f: Consumer[Array[Byte], Array[Byte]] => Unit): Unit = {
+    val consumer = TestUtils.createConsumer(
+      brokerList = cluster.bootstrapServers(),
+      groupId = groupId,
+      groupProtocol = groupProtocol,
+      enableAutoCommit = enableAutoCommit
+    )
+    try {
+      f(consumer)
+    } finally {
+      consumer.close()
+    }
+  }
+
+  private def assertDescribedGroup(
+    groups: Map[String, KafkaFuture[ConsumerGroupDescription]],
+    groupId: String,
+    groupType: GroupType,
+    state: ConsumerGroupState
+  ): Unit = {
+    val group = groups(groupId).get(10, TimeUnit.SECONDS)
+
+    assertEquals(groupId, group.groupId)
+    assertEquals(groupType, group.`type`)
+    assertEquals(state, group.state)
+    assertEquals(Collections.emptyList, group.members)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c3a19864321..f503caf43c4 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -748,9 +748,11 @@ object TestUtils extends Logging {
                            trustStoreFile: Option[File] = None,
                            saslProperties: Option[Properties] = None,
                            keyDeserializer: Deserializer[K] = new 
ByteArrayDeserializer,
-                           valueDeserializer: Deserializer[V] = new 
ByteArrayDeserializer): Consumer[K, V] = {
+                           valueDeserializer: Deserializer[V] = new 
ByteArrayDeserializer,
+                           groupProtocol: GroupProtocol = 
GroupProtocol.CLASSIC): Consumer[K, V] = {
     val consumerProps = new Properties
     consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.toString)
     consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset)
     consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
     consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
enableAutoCommit.toString)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 4816f780ab6..58d0070af3e 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -793,8 +793,7 @@ public class GroupMetadataManager {
             } else if (createIfNotExists && group.type() == CLASSIC && 
validateOnlineUpgrade((ClassicGroup) group)) {
                 return convertToConsumerGroup((ClassicGroup) group, records);
             } else {
-                throw new GroupIdNotFoundException(String.format("Group %s is 
not a consumer group.",
-                    groupId));
+                throw new GroupIdNotFoundException(String.format("Group %s is 
not a consumer group", groupId));
             }
         }
     }
@@ -817,10 +816,7 @@ public class GroupMetadataManager {
         if (group.type() == CONSUMER) {
             return (ConsumerGroup) group;
         } else {
-            // We don't support upgrading/downgrading between protocols at the 
moment so
-            // we throw an exception if a group exists with the wrong type.
-            throw new GroupIdNotFoundException(String.format("Group %s is not 
a consumer group.",
-                groupId));
+            throw new GroupIdNotFoundException(String.format("Group %s is not 
a consumer group", groupId));
         }
     }
 
@@ -842,18 +838,19 @@ public class GroupMetadataManager {
      *                          created if it does not exist.
      *
      * @return A ConsumerGroup.
-     * @throws IllegalStateException if the group does not exist and 
createIfNotExists is false or
-     *                               if the group is not a consumer group.
+     * @throws GroupIdNotFoundException if the group does not exist and 
createIfNotExists is false or
+     *                                  if the group is not a consumer group.
+     * @throws IllegalStateException    if the group does not have the 
expected type.
      * Package private for testing.
      */
     ConsumerGroup getOrMaybeCreatePersistedConsumerGroup(
         String groupId,
         boolean createIfNotExists
-    ) throws GroupIdNotFoundException {
+    ) throws GroupIdNotFoundException, IllegalStateException {
         Group group = groups.get(groupId);
 
         if (group == null && !createIfNotExists) {
-            throw new IllegalStateException(String.format("Consumer group %s 
not found.", groupId));
+            throw new GroupIdNotFoundException(String.format("Consumer group 
%s not found", groupId));
         }
 
         if (group == null) {
@@ -861,14 +858,20 @@ public class GroupMetadataManager {
             groups.put(groupId, consumerGroup);
             metrics.onConsumerGroupStateTransition(null, 
consumerGroup.state());
             return consumerGroup;
+        } else if (group.type() == CONSUMER) {
+            return (ConsumerGroup) group;
+        } else if (group.type() == CLASSIC && ((ClassicGroup) 
group).isSimpleGroup()) {
+            // If the group is a simple classic group, it was automatically 
created to hold committed
+            // offsets if no group existed. Simple classic groups are not 
backed by any records
+            // in the __consumer_offsets topic hence we can safely replace it 
here. Without this,
+            // replaying consumer group records after offset commit records 
would not work.
+            ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+            groups.put(groupId, consumerGroup);
+            metrics.onConsumerGroupStateTransition(null, 
consumerGroup.state());
+            metrics.onClassicGroupStateTransition(EMPTY, null);
+            return consumerGroup;
         } else {
-            if (group.type() == CONSUMER) {
-                return (ConsumerGroup) group;
-            } else {
-                // We don't support upgrading/downgrading between protocols at 
the moment so
-                // we throw an exception if a group exists with the wrong type.
-                throw new IllegalStateException(String.format("Group %s is not 
a consumer group.", groupId));
-            }
+            throw new IllegalStateException(String.format("Group %s is not a 
consumer group", groupId));
         }
     }
 
@@ -904,10 +907,7 @@ public class GroupMetadataManager {
             if (group.type() == CLASSIC) {
                 return (ClassicGroup) group;
             } else {
-                // We don't support upgrading/downgrading between protocols at 
the moment so
-                // we throw an exception if a group exists with the wrong type.
-                throw new GroupIdNotFoundException(String.format("Group %s is 
not a classic group.",
-                    groupId));
+                throw new GroupIdNotFoundException(String.format("Group %s is 
not a classic group.", groupId));
             }
         }
     }
@@ -930,10 +930,7 @@ public class GroupMetadataManager {
         if (group.type() == CLASSIC) {
             return (ClassicGroup) group;
         } else {
-            // We don't support upgrading/downgrading between protocols at the 
moment so
-            // we throw an exception if a group exists with the wrong type.
-            throw new GroupIdNotFoundException(String.format("Group %s is not 
a classic group.",
-                groupId));
+            throw new GroupIdNotFoundException(String.format("Group %s is not 
a classic group.", groupId));
         }
     }
 
@@ -3216,7 +3213,14 @@ public class GroupMetadataManager {
         String groupId = key.groupId();
         String memberId = key.memberId();
 
-        ConsumerGroup consumerGroup = 
getOrMaybeCreatePersistedConsumerGroup(groupId, value != null);
+        ConsumerGroup consumerGroup;
+        try {
+            consumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, 
value != null);
+        } catch (GroupIdNotFoundException ex) {
+            // If the group does not exist and a tombstone is replayed, we can 
ignore it.
+            return;
+        }
+
         Set<String> oldSubscribedTopicNames = new 
HashSet<>(consumerGroup.subscribedTopicNames().keySet());
 
         if (value != null) {
@@ -3225,7 +3229,14 @@ public class GroupMetadataManager {
                 .updateWith(value)
                 .build());
         } else {
-            ConsumerGroupMember oldMember = 
consumerGroup.getOrMaybeCreateMember(memberId, false);
+            ConsumerGroupMember oldMember;
+            try {
+                oldMember = consumerGroup.getOrMaybeCreateMember(memberId, 
false);
+            } catch (UnknownMemberIdException ex) {
+                // If the member does not exist, we can ignore it.
+                return;
+            }
+
             if (oldMember.memberEpoch() != LEAVE_GROUP_MEMBER_EPOCH) {
                 throw new IllegalStateException("Received a tombstone record 
to delete member " + memberId
                     + " but did not receive 
ConsumerGroupCurrentMemberAssignmentValue tombstone.");
@@ -3331,7 +3342,14 @@ public class GroupMetadataManager {
             ConsumerGroup consumerGroup = 
getOrMaybeCreatePersistedConsumerGroup(groupId, true);
             consumerGroup.setGroupEpoch(value.epoch());
         } else {
-            ConsumerGroup consumerGroup = 
getOrMaybeCreatePersistedConsumerGroup(groupId, false);
+            ConsumerGroup consumerGroup;
+            try {
+                consumerGroup = 
getOrMaybeCreatePersistedConsumerGroup(groupId, false);
+            } catch (GroupIdNotFoundException ex) {
+                // If the group does not exist, we can ignore the tombstone.
+                return;
+            }
+
             if (!consumerGroup.members().isEmpty()) {
                 throw new IllegalStateException("Received a tombstone record 
to delete group " + groupId
                     + " but the group still has " + 
consumerGroup.members().size() + " members.");
@@ -3363,7 +3381,14 @@ public class GroupMetadataManager {
         ConsumerGroupPartitionMetadataValue value
     ) {
         String groupId = key.groupId();
-        ModernGroup<?> group = getOrMaybeCreatePersistedConsumerGroup(groupId, 
false);
+
+        ConsumerGroup group;
+        try {
+            group = getOrMaybeCreatePersistedConsumerGroup(groupId, value != 
null);
+        } catch (GroupIdNotFoundException ex) {
+            // If the group does not exist, we can ignore the tombstone.
+            return;
+        }
 
         if (value != null) {
             Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
@@ -3389,11 +3414,18 @@ public class GroupMetadataManager {
     ) {
         String groupId = key.groupId();
         String memberId = key.memberId();
-        ModernGroup<?> group = getOrMaybeCreatePersistedConsumerGroup(groupId, 
false);
 
         if (value != null) {
+            ConsumerGroup group = 
getOrMaybeCreatePersistedConsumerGroup(groupId, true);
             group.updateTargetAssignment(memberId, 
Assignment.fromRecord(value));
         } else {
+            ConsumerGroup group;
+            try {
+                group = getOrMaybeCreatePersistedConsumerGroup(groupId, false);
+            } catch (GroupIdNotFoundException ex) {
+                // If the group does not exist, we can ignore the tombstone.
+                return;
+            }
             group.removeTargetAssignment(memberId);
         }
     }
@@ -3411,11 +3443,18 @@ public class GroupMetadataManager {
         ConsumerGroupTargetAssignmentMetadataValue value
     ) {
         String groupId = key.groupId();
-        ModernGroup<?> group = getOrMaybeCreatePersistedConsumerGroup(groupId, 
false);
 
         if (value != null) {
+            ConsumerGroup group = 
getOrMaybeCreatePersistedConsumerGroup(groupId, true);
             group.setTargetAssignmentEpoch(value.assignmentEpoch());
         } else {
+            ConsumerGroup group;
+            try {
+                group = getOrMaybeCreatePersistedConsumerGroup(groupId, false);
+            } catch (GroupIdNotFoundException ex) {
+                // If the group does not exist, we can ignore the tombstone.
+                return;
+            }
             if (!group.targetAssignment().isEmpty()) {
                 throw new IllegalStateException("Received a tombstone record 
to delete target assignment of " + groupId
                     + " but the assignment still has " + 
group.targetAssignment().size() + " members.");
@@ -3438,15 +3477,30 @@ public class GroupMetadataManager {
         String groupId = key.groupId();
         String memberId = key.memberId();
 
-        ConsumerGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId, 
false);
-        ConsumerGroupMember oldMember = group.getOrMaybeCreateMember(memberId, 
false);
-
         if (value != null) {
+            ConsumerGroup group = 
getOrMaybeCreatePersistedConsumerGroup(groupId, true);
+            ConsumerGroupMember oldMember = 
group.getOrMaybeCreateMember(memberId, true);
             ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(oldMember)
                 .updateWith(value)
                 .build();
             group.updateMember(newMember);
         } else {
+            ConsumerGroup group;
+            try {
+                group = getOrMaybeCreatePersistedConsumerGroup(groupId, false);
+            } catch (GroupIdNotFoundException ex) {
+                // If the group does not exist, we can ignore the tombstone.
+                return;
+            }
+
+            ConsumerGroupMember oldMember;
+            try {
+                oldMember = group.getOrMaybeCreateMember(memberId, false);
+            } catch (UnknownMemberIdException ex) {
+                // If the member does not exist, we can ignore the tombstone.
+                return;
+            }
+
             ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(oldMember)
                 .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
                 .setPreviousMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
index 9d1181a2fe2..907380289e6 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
@@ -292,6 +292,13 @@ public class ClassicGroup implements Group {
         return this.protocolType;
     }
 
+    /**
+     * @return True if the group is a simple group.
+     */
+    public boolean isSimpleGroup() {
+        return !protocolType.isPresent() && isEmpty() && 
pendingJoinMembers.isEmpty();
+    }
+
     /**
      * @return the current group state.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
index 3bcbeedc552..508b1c7d131 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.coordinator.group.modern;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.coordinator.group.Group;
 import org.apache.kafka.coordinator.group.Utils;
@@ -578,8 +579,9 @@ public abstract class ModernGroup<T extends 
ModernGroupMember> implements Group
      *                          created if it does not exist.
      *
      * @return A ConsumerGroupMember.
+     * @throws UnknownMemberIdException when the member does not exist and 
createIfNotExists is false.
      */
-    public abstract T getOrMaybeCreateMember(String memberId, boolean 
createIfNotExists);
+    public abstract T getOrMaybeCreateMember(String memberId, boolean 
createIfNotExists) throws UnknownMemberIdException;
 
     /**
      * Adds or updates the member.
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index 556e67fe985..817cf7bbe24 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -221,11 +221,12 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
      *                          created if it does not exist.
      *
      * @return A ConsumerGroupMember.
+     * @throws UnknownMemberIdException when the member does not exist and 
createIfNotExists is false.
      */
     public ConsumerGroupMember getOrMaybeCreateMember(
         String memberId,
         boolean createIfNotExists
-    ) {
+    ) throws UnknownMemberIdException {
         ConsumerGroupMember member = members.get(memberId);
         if (member != null) return member;
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
index b7c0c433842..1e641dfbe87 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
@@ -132,11 +132,12 @@ public class ShareGroup extends 
ModernGroup<ShareGroupMember> {
      *                          created if it does not exist.
      *
      * @return A ShareGroupMember.
+     * @throws UnknownMemberIdException when the member does not exist and 
createIfNotExists is false.
      */
     public ShareGroupMember getOrMaybeCreateMember(
         String memberId,
         boolean createIfNotExists
-    ) {
+    ) throws UnknownMemberIdException {
         ShareGroupMember member = members.get(memberId);
         if (member != null) return member;
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 22ae88aaf53..4fbe8967ad7 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -9569,7 +9569,7 @@ public class GroupMetadataManagerTest {
         verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
 null);
 
         // Replaying a tombstone for a group that has already been removed 
should not decrement metric.
-        tombstones.forEach(tombstone -> 
assertThrows(IllegalStateException.class, () -> context.replay(tombstone)));
+        tombstones.forEach(context::replay);
         verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
 null);
     }
 
@@ -9586,8 +9586,8 @@ public class GroupMetadataManagerTest {
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("group-id"));
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("group-id"));
         IntStream.range(0, 3).forEach(__ -> {
-            assertThrows(IllegalStateException.class, () -> 
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("group-id")));
-            assertThrows(IllegalStateException.class, () -> 
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("group-id")));
+            
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("group-id"));
+            
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("group-id"));
         });
 
         verify(context.metrics, times(1)).onConsumerGroupStateTransition(null, 
ConsumerGroup.ConsumerGroupState.EMPTY);
@@ -14285,6 +14285,169 @@ public class GroupMetadataManagerTest {
         context.assertNoRebalanceTimeout(groupId, memberId);
     }
 
+    @Test
+    public void testReplayConsumerGroupMemberMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+            .setClientId("clientid")
+            .setClientHost("clienthost")
+            .setServerAssignorName("range")
+            .setRackId("rackid")
+            .setSubscribedTopicNames(Collections.singletonList("foo"))
+            .build();
+
+        // The group and the member are created if they do not exist.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord("foo",
 member));
+        assertEquals(member, 
context.groupMetadataManager.consumerGroup("foo").getOrMaybeCreateMember("member",
 false));
+    }
+
+    @Test
+    public void testReplayConsumerGroupMemberMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group still exists but the member is already gone. Replaying the
+        // ConsumerGroupMemberMetadata tombstone should be a no-op.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 
10));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo",
 "m1"));
+        assertThrows(UnknownMemberIdException.class, () -> 
context.groupMetadataManager.consumerGroup("foo").getOrMaybeCreateMember("m1", 
false));
+
+        // The group may not exist at all. Replaying the 
ConsumerGroupMemberMetadata tombstone
+        // should a no-op.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("bar",
 "m1"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.consumerGroup("bar"));
+    }
+
+    @Test
+    public void testReplayConsumerGroupMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group is created if it does not exist.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 
10));
+        assertEquals(10, 
context.groupMetadataManager.consumerGroup("foo").groupEpoch());
+    }
+
+    @Test
+    public void testReplayConsumerGroupMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the ConsumerGroupMetadata 
tombstone
+        // should be a no-op.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("foo"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.consumerGroup("foo"));
+    }
+
+    @Test
+    public void testReplayConsumerGroupPartitionMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        Map<String, TopicMetadata> metadata = Collections.singletonMap(
+            "bar",
+            new TopicMetadata(Uuid.randomUuid(), "bar", 10, 
Collections.emptyMap())
+        );
+
+        // The group is created if it does not exist.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord("foo",
 metadata));
+        assertEquals(metadata, 
context.groupMetadataManager.consumerGroup("foo").subscriptionMetadata());
+    }
+
+    @Test
+    public void testReplayConsumerGroupPartitionMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the 
ConsumerGroupPartitionMetadata tombstone
+        // should be a no-op.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord("foo"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.consumerGroup("foo"));
+    }
+
+    @Test
+    public void testReplayConsumerGroupTargetAssignmentMember() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        Map<Uuid, Set<Integer>> assignment = mkAssignment(
+            mkTopicAssignment(Uuid.randomUuid(), 0, 1, 2)
+        );
+
+        // The group is created if it does not exist.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord("foo",
 "m1", assignment));
+        assertEquals(assignment, 
context.groupMetadataManager.consumerGroup("foo").targetAssignment("m1").partitions());
+    }
+
+    @Test
+    public void testReplayConsumerGroupTargetAssignmentMemberTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the 
ConsumerGroupTargetAssignmentMember tombstone
+        // should be a no-op.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord("foo",
 "m1"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.consumerGroup("foo"));
+    }
+
+    @Test
+    public void testReplayConsumerGroupTargetAssignmentMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group is created if it does not exist.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord("foo",
 10));
+        assertEquals(10, 
context.groupMetadataManager.consumerGroup("foo").assignmentEpoch());
+    }
+
+    @Test
+    public void testReplayConsumerGroupTargetAssignmentMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the 
ConsumerGroupTargetAssignmentMetadata tombstone
+        // should be a no-op.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("foo"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.consumerGroup("foo"));
+    }
+
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignment() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setState(MemberState.UNRELEASED_PARTITIONS)
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(Uuid.randomUuid(), 0, 1, 2)))
+            .build();
+
+        // The group and the member are created if they do not exist.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord("bar",
 member));
+        assertEquals(member, 
context.groupMetadataManager.consumerGroup("bar").getOrMaybeCreateMember("member",
 false));
+    }
+
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignmentTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group still exists but the member is already gone. Replaying the
+        // ConsumerGroupCurrentMemberAssignment tombstone should be a no-op.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 
10));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("foo",
 "m1"));
+        assertThrows(UnknownMemberIdException.class, () -> 
context.groupMetadataManager.consumerGroup("foo").getOrMaybeCreateMember("m1", 
false));
+
+        // The group may not exist at all. Replaying the 
ConsumerGroupCurrentMemberAssignment tombstone
+        // should be a no-op.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("bar",
 "m1"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.consumerGroup("bar"));
+    }
+
     @Test
     public void testConsumerGroupHeartbeatOnShareGroup() {
         String groupId = "group-foo";

Reply via email to