This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b9eae3388cb KAFKA-20254: Add integration tests for group migration 
with compaction and broker restart (#21642)
b9eae3388cb is described below

commit b9eae3388cb8145c3e43130f66526927f804a749
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Mar 10 15:12:54 2026 +0100

    KAFKA-20254: Add integration tests for group migration with compaction and 
broker restart (#21642)
    
    This adds integration tests to GroupCoordinatorIntegrationTest that
    verify the group coordinator loads correctly after compaction and broker
    restart when a group has been upgraded from the classic protocol to the
    consumer or streams protocol.
    
    For each protocol upgrade path (classic-to-consumer and
    classic-to-streams), there are two test variants. The first variant
    compacts __consumer_offsets but retains the GroupMetadata tombstone,
    verifying that replaying tombstones after compaction works correctly.
    The second variant forces tombstone removal by running two compaction
    passes with delete.retention.ms=0 and min.cleanable.dirty.ratio=0. This
    reproduces the KAFKA-20254 scenario: after the tombstone is removed, the
    classic group's offset commit records precede the consumer/streams group
    records in the log. During replay, these offset commits create a simple
    classic group, which the consumer/streams group replay logic must handle
    via the isSimpleGroup() fix in
    
    
getOrMaybeCreatePersistedConsumerGroup/getOrMaybeCreatePersistedStreamsGroup.
    
    In the tombstone-removed variants, the upgraded group avoids committing
    offsets so that the classic group's offset commits are not overwritten
    during compaction and survive at their original early position in the
    log, naturally triggering the bug without the fix.
    
    Reviewers: David Jacot <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 build.gradle                                       |   1 +
 .../api/GroupCoordinatorIntegrationTest.scala      | 355 ++++++++++++++++++++-
 2 files changed, 351 insertions(+), 5 deletions(-)

diff --git a/build.gradle b/build.gradle
index 88e8dd81981..23a03b0782a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1114,6 +1114,7 @@ project(':core') {
     testImplementation project(':server-common').sourceSets.test.output
     testImplementation project(':storage:storage-api').sourceSets.test.output
     testImplementation project(':server').sourceSets.test.output
+    testImplementation project(':streams')
     testImplementation project(':test-common:test-common-runtime')
     testImplementation project(':test-common:test-common-internal-api')
     testImplementation project(':test-common:test-common-util')
diff --git 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index 5e22d7a86c3..acd97b7009c 100644
--- 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -14,10 +14,12 @@ package kafka.api
 
 import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, 
Type}
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription}
-import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol, 
OffsetAndMetadata}
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, 
ConsumerGroupDescription}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
GroupProtocol, OffsetAndMetadata}
+import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.errors.{GroupIdNotFoundException, 
UnknownTopicOrPartitionException}
-import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture, 
TopicCollection, TopicPartition}
+import org.apache.kafka.common.{ConsumerGroupState, GroupState, GroupType, 
KafkaFuture, TopicCollection, TopicPartition}
+import org.apache.kafka.common.serialization.Serdes
 import org.junit.jupiter.api.Assertions._
 
 import scala.jdk.CollectionConverters._
@@ -26,11 +28,14 @@ import 
org.apache.kafka.common.record.internal.CompressionType
 import org.apache.kafka.common.test.ClusterInstance
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.server.config.ServerConfigs
+import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig}
+import org.apache.kafka.streams.{GroupProtocol => StreamsGroupProtocol}
 import org.apache.kafka.storage.internals.log.UnifiedLog
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Timeout
 
 import java.time.Duration
+import java.util.Properties
 import java.util.concurrent.TimeUnit
 import scala.concurrent.ExecutionException
 
@@ -279,6 +284,295 @@ class GroupCoordinatorIntegrationTest(cluster: 
ClusterInstance) {
     }
   }
 
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    )
+  )
+  def 
testCoordinatorFailoverAfterCompactingPartitionWithUpgradedConsumerGroupAndTombstoneRemoved():
 Unit = {
+    withAdmin { admin =>
+      TestUtils.createTopicWithAdminRaw(
+        admin = admin,
+        topic = "foo",
+        numPartitions = 3
+      )
+
+      // Create a classic group with one member and commit offsets.
+      withConsumer(groupId = "grp4", groupProtocol = GroupProtocol.CLASSIC, 
enableAutoCommit = false) { consumer =>
+        consumer.subscribe(java.util.List.of("foo"))
+        TestUtils.waitUntilTrue(() => {
+          consumer.poll(Duration.ofMillis(50))
+          consumer.assignment().asScala.nonEmpty
+        }, msg = "Consumer did not get an non empty assignment")
+        consumer.commitSync()
+      }
+
+      // Set delete.retention.ms=0 before the tombstone is written so that
+      // compaction will remove it.
+      configureDeleteRetention()
+
+      // Upgrade the group to the consumer protocol. Don't commit offsets
+      // so the classic group's offset commits survive compaction.
+      withConsumer(groupId = "grp4", groupProtocol = GroupProtocol.CONSUMER, 
enableAutoCommit = false) { consumer =>
+        consumer.subscribe(java.util.List.of("foo"))
+        TestUtils.waitUntilTrue(() => {
+          consumer.poll(Duration.ofMillis(50))
+          consumer.assignment().asScala.nonEmpty
+        }, msg = "Consumer did not get an non empty assignment")
+      }
+    }
+
+    // Force compaction twice to remove tombstones: the first pass sets
+    // deleteHorizonMs, and the second pass removes them.
+    rollAndCompactConsumerOffsets()
+    writeOneOffsetCommit()
+    rollAndCompactConsumerOffsets()
+
+    // Restart the broker to reload the group coordinator.
+    cluster.shutdownBroker(0)
+    cluster.startBroker(0)
+
+    // Verify the state of the groups to ensure that the group coordinator
+    // was correctly loaded. Without the fix for KAFKA-20254, the offset
+    // commit records create a simple classic group during replay and the
+    // consumer group records fail to load.
+    withAdmin { admin =>
+      val groups = admin
+        .describeConsumerGroups(java.util.List.of("grp4"))
+        .describedGroups()
+        .asScala
+        .toMap
+
+      assertDescribedGroup(groups, "grp4", GroupType.CONSUMER, 
ConsumerGroupState.EMPTY)
+    }
+  }
+
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    )
+  )
+  def 
testCoordinatorFailoverAfterCompactingPartitionWithUpgradedSimpleConsumerGroup():
 Unit = {
+    withAdmin { admin =>
+      TestUtils.createTopicWithAdminRaw(
+        admin = admin,
+        topic = "foo",
+        numPartitions = 3
+      )
+
+      // Create a simple classic group by committing offsets directly
+      // without subscribing. This only writes offset commit records
+      // without any GroupMetadata records.
+      withConsumer(groupId = "grp6", groupProtocol = GroupProtocol.CLASSIC, 
enableAutoCommit = false) { consumer =>
+        val tp = new TopicPartition("foo", 0)
+        consumer.assign(java.util.List.of(tp))
+        consumer.commitSync(java.util.Map.of(tp, new OffsetAndMetadata(0)))
+      }
+
+      // Upgrade the group to the consumer protocol. Don't commit offsets
+      // so the simple classic group's offset commits survive compaction.
+      withConsumer(groupId = "grp6", groupProtocol = GroupProtocol.CONSUMER, 
enableAutoCommit = false) { consumer =>
+        consumer.subscribe(java.util.List.of("foo"))
+        TestUtils.waitUntilTrue(() => {
+          consumer.poll(Duration.ofMillis(50))
+          consumer.assignment().asScala.nonEmpty
+        }, msg = "Consumer did not get an non empty assignment")
+      }
+    }
+
+    // Force a compaction. Since a simple classic group has no
+    // GroupMetadata records, there are no tombstones — the offset
+    // commit records always survive compaction.
+    rollAndCompactConsumerOffsets()
+
+    // Restart the broker to reload the group coordinator.
+    cluster.shutdownBroker(0)
+    cluster.startBroker(0)
+
+    // Verify the state of the groups to ensure that the group coordinator
+    // was correctly loaded. Without the fix for KAFKA-20254, the offset
+    // commit records create a simple classic group during replay and the
+    // consumer group records fail to load.
+    withAdmin { admin =>
+      val groups = admin
+        .describeConsumerGroups(java.util.List.of("grp6"))
+        .describedGroups()
+        .asScala
+        .toMap
+
+      assertDescribedGroup(groups, "grp6", GroupType.CONSUMER, 
ConsumerGroupState.EMPTY)
+    }
+  }
+
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    )
+  )
+  def 
testCoordinatorFailoverAfterCompactingPartitionWithUpgradedStreamsGroup(): Unit 
= {
+    withAdmin { admin =>
+      TestUtils.createTopicWithAdminRaw(
+        admin = admin,
+        topic = "foo",
+        numPartitions = 3
+      )
+
+      // Create a classic group with one member.
+      withConsumer(groupId = "grp5", groupProtocol = GroupProtocol.CLASSIC) { 
consumer =>
+        consumer.subscribe(java.util.List.of("foo"))
+        TestUtils.waitUntilTrue(() => {
+          consumer.poll(Duration.ofMillis(50))
+          consumer.assignment().asScala.nonEmpty
+        }, msg = "Consumer did not get an non empty assignment")
+      }
+
+      // Upgrade the group to the streams protocol.
+      withStreamsApp(applicationId = "grp5", inputTopic = "foo")
+    }
+
+    // Force a compaction.
+    rollAndCompactConsumerOffsets()
+
+    // Restart the broker to reload the group coordinator.
+    cluster.shutdownBroker(0)
+    cluster.startBroker(0)
+
+    // Verify the state of the groups to ensure that the group coordinator
+    // was correctly loaded. If replaying any of the records fails, the
+    // group coordinator won't be available.
+    withAdmin { admin =>
+      val groups = admin
+        .describeStreamsGroups(java.util.List.of("grp5"))
+        .describedGroups()
+        .asScala
+        .toMap
+
+      val group = groups("grp5").get(10, TimeUnit.SECONDS)
+      assertEquals("grp5", group.groupId)
+      assertEquals(GroupState.EMPTY, group.groupState)
+    }
+  }
+
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    )
+  )
+  def 
testCoordinatorFailoverAfterCompactingPartitionWithUpgradedStreamsGroupAndTombstoneRemoved():
 Unit = {
+    withAdmin { admin =>
+      TestUtils.createTopicWithAdminRaw(
+        admin = admin,
+        topic = "foo",
+        numPartitions = 3
+      )
+
+      // Create a classic group with one member and commit offsets.
+      withConsumer(groupId = "grp5", groupProtocol = GroupProtocol.CLASSIC, 
enableAutoCommit = false) { consumer =>
+        consumer.subscribe(java.util.List.of("foo"))
+        TestUtils.waitUntilTrue(() => {
+          consumer.poll(Duration.ofMillis(50))
+          consumer.assignment().asScala.nonEmpty
+        }, msg = "Consumer did not get an non empty assignment")
+        consumer.commitSync()
+      }
+
+      // Set delete.retention.ms=0 before the tombstone is written so that
+      // compaction will remove it.
+      configureDeleteRetention()
+
+      // Upgrade the group to the streams protocol.
+      withStreamsApp(applicationId = "grp5", inputTopic = "foo")
+    }
+
+    // Force compaction twice to remove tombstones: the first pass sets
+    // deleteHorizonMs, and the second pass removes them.
+    rollAndCompactConsumerOffsets()
+    writeOneOffsetCommit()
+    rollAndCompactConsumerOffsets()
+
+    // Restart the broker to reload the group coordinator.
+    cluster.shutdownBroker(0)
+    cluster.startBroker(0)
+
+    // Verify the state of the groups to ensure that the group coordinator
+    // was correctly loaded. Without the fix for KAFKA-20254, the offset
+    // commit records create a simple classic group during replay and the
+    // streams group records fail to load.
+    withAdmin { admin =>
+      val groups = admin
+        .describeStreamsGroups(java.util.List.of("grp5"))
+        .describedGroups()
+        .asScala
+        .toMap
+
+      val group = groups("grp5").get(10, TimeUnit.SECONDS)
+      assertEquals("grp5", group.groupId)
+      assertEquals(GroupState.EMPTY, group.groupState)
+    }
+  }
+
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    )
+  )
+  def 
testCoordinatorFailoverAfterCompactingPartitionWithUpgradedSimpleStreamsGroup():
 Unit = {
+    withAdmin { admin =>
+      TestUtils.createTopicWithAdminRaw(
+        admin = admin,
+        topic = "foo",
+        numPartitions = 3
+      )
+
+      // Create a simple classic group by committing offsets directly
+      // without subscribing. This only writes offset commit records
+      // without any GroupMetadata records.
+      withConsumer(groupId = "grp7", groupProtocol = GroupProtocol.CLASSIC, 
enableAutoCommit = false) { consumer =>
+        val tp = new TopicPartition("foo", 0)
+        consumer.assign(java.util.List.of(tp))
+        consumer.commitSync(java.util.Map.of(tp, new OffsetAndMetadata(0)))
+      }
+
+      // Upgrade the group to the streams protocol.
+      withStreamsApp(applicationId = "grp7", inputTopic = "foo")
+    }
+
+    // Force a compaction. Since a simple classic group has no
+    // GroupMetadata records, there are no tombstones — the offset
+    // commit records always survive compaction.
+    rollAndCompactConsumerOffsets()
+
+    // Restart the broker to reload the group coordinator.
+    cluster.shutdownBroker(0)
+    cluster.startBroker(0)
+
+    // Verify the state of the groups to ensure that the group coordinator
+    // was correctly loaded. Without the fix for KAFKA-20254, the offset
+    // commit records create a simple classic group during replay and the
+    // streams group records fail to load.
+    withAdmin { admin =>
+      val groups = admin
+        .describeStreamsGroups(java.util.List.of("grp7"))
+        .describedGroups()
+        .asScala
+        .toMap
+
+      val group = groups("grp7").get(10, TimeUnit.SECONDS)
+      assertEquals("grp7", group.groupId)
+      assertEquals(GroupState.EMPTY, group.groupState)
+    }
+  }
+
   @ClusterTest(
     types = Array(Type.KRAFT),
     serverProperties = Array(
@@ -331,12 +625,34 @@ class GroupCoordinatorIntegrationTest(cluster: 
ClusterInstance) {
     }
   }
 
+  private def writeOneOffsetCommit(): Unit = {
+    // Write a single offset commit to create dirty data past the cleaner
+    // checkpoint so the cleaner will re-process previously compacted
+    // segments on the next compaction pass.
+    withConsumer(groupId = "compaction-trigger", groupProtocol = 
GroupProtocol.CLASSIC, enableAutoCommit = false) { consumer =>
+      val tp = new TopicPartition("foo", 0)
+      consumer.assign(java.util.List.of(tp))
+      consumer.commitSync(java.util.Map.of(tp, new OffsetAndMetadata(0)))
+    }
+  }
+
+  private def configureDeleteRetention(): Unit = {
+    withAdmin { admin =>
+      val resource = new ConfigResource(ConfigResource.Type.TOPIC, 
Topic.GROUP_METADATA_TOPIC_NAME)
+      admin.incrementalAlterConfigs(java.util.Map.of(resource, 
java.util.List.of(
+        new AlterConfigOp(new 
ConfigEntry(TopicConfig.DELETE_RETENTION_MS_CONFIG, "0"), 
AlterConfigOp.OpType.SET),
+        new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.0"), 
AlterConfigOp.OpType.SET)
+      ))).all().get()
+    }
+  }
+
   private def rollAndCompactConsumerOffsets(): Unit = {
-    val tp = new TopicPartition("__consumer_offsets", 0)
+    val tp = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
     val broker = cluster.brokers.asScala.head._2
     val log = broker.logManager.getLog(tp).get
+    val endOffset = log.logEndOffset
     log.roll()
-    assertTrue(broker.logManager.cleaner.awaitCleaned(tp, 0, 60000L))
+    assertTrue(broker.logManager.cleaner.awaitCleaned(tp, endOffset, 60000L))
   }
 
   private def withAdmin(f: Admin => Unit): Unit = {
@@ -366,6 +682,35 @@ class GroupCoordinatorIntegrationTest(cluster: 
ClusterInstance) {
     }
   }
 
+  private def withStreamsApp(
+    applicationId: String,
+    inputTopic: String
+  ): Unit = {
+    val builder = new StreamsBuilder()
+    builder.stream(inputTopic)
+
+    val props = new Properties()
+    props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
+    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers())
+    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
classOf[Serdes.StringSerde].getName)
+    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
classOf[Serdes.StringSerde].getName)
+    props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
StreamsGroupProtocol.STREAMS.name())
+    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "500")
+    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "100")
+
+    val streams = new KafkaStreams(builder.build(), props)
+    try {
+      streams.start()
+      TestUtils.waitUntilTrue(
+        () => streams.state() == KafkaStreams.State.RUNNING,
+        msg = "Streams app did not reach RUNNING state"
+      )
+    } finally {
+      streams.close(Duration.ofSeconds(30))
+      streams.cleanUp()
+    }
+  }
+
   private def assertDescribedGroup(
     groups: Map[String, KafkaFuture[ConsumerGroupDescription]],
     groupId: String,

Reply via email to