This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b9eae3388cb KAFKA-20254: Add integration tests for group migration
with compaction and broker restart (#21642)
b9eae3388cb is described below
commit b9eae3388cb8145c3e43130f66526927f804a749
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Mar 10 15:12:54 2026 +0100
KAFKA-20254: Add integration tests for group migration with compaction and
broker restart (#21642)
This adds integration tests to GroupCoordinatorIntegrationTest that
verify the group coordinator loads correctly after compaction and broker
restart when a group has been upgraded from the classic protocol to the
consumer or streams protocol.
For each protocol upgrade path (classic-to-consumer and
classic-to-streams), there are two test variants. The first variant
compacts __consumer_offsets but retains the GroupMetadata tombstone,
verifying that replaying tombstones after compaction works correctly.
The second variant forces tombstone removal by running two compaction
passes with delete.retention.ms=0 and min.cleanable.dirty.ratio=0. This
reproduces the KAFKA-20254 scenario: after the tombstone is removed, the
classic group's offset commit records precede the consumer/streams group
records in the log. During replay, these offset commits create a simple
classic group, which the consumer/streams group replay logic must handle
via the isSimpleGroup() fix in
getOrMaybeCreatePersistedConsumerGroup/getOrMaybeCreatePersistedStreamsGroup.
In the tombstone-removed variants, the upgraded group avoids committing
offsets so that the classic group's offset commits are not overwritten
during compaction and survive at their original early position in the
log, naturally triggering the bug without the fix.
Reviewers: David Jacot <[email protected]>, Matthias J. Sax
<[email protected]>
---
build.gradle | 1 +
.../api/GroupCoordinatorIntegrationTest.scala | 355 ++++++++++++++++++++-
2 files changed, 351 insertions(+), 5 deletions(-)
diff --git a/build.gradle b/build.gradle
index 88e8dd81981..23a03b0782a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1114,6 +1114,7 @@ project(':core') {
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':storage:storage-api').sourceSets.test.output
testImplementation project(':server').sourceSets.test.output
+ testImplementation project(':streams')
testImplementation project(':test-common:test-common-runtime')
testImplementation project(':test-common:test-common-internal-api')
testImplementation project(':test-common:test-common-util')
diff --git
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index 5e22d7a86c3..acd97b7009c 100644
---
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -14,10 +14,12 @@ package kafka.api
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest,
Type}
import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription}
-import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol,
OffsetAndMetadata}
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry,
ConsumerGroupDescription}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig,
GroupProtocol, OffsetAndMetadata}
+import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.{GroupIdNotFoundException,
UnknownTopicOrPartitionException}
-import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture,
TopicCollection, TopicPartition}
+import org.apache.kafka.common.{ConsumerGroupState, GroupState, GroupType,
KafkaFuture, TopicCollection, TopicPartition}
+import org.apache.kafka.common.serialization.Serdes
import org.junit.jupiter.api.Assertions._
import scala.jdk.CollectionConverters._
@@ -26,11 +28,14 @@ import
org.apache.kafka.common.record.internal.CompressionType
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.config.ServerConfigs
+import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig}
+import org.apache.kafka.streams.{GroupProtocol => StreamsGroupProtocol}
import org.apache.kafka.storage.internals.log.UnifiedLog
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Timeout
import java.time.Duration
+import java.util.Properties
import java.util.concurrent.TimeUnit
import scala.concurrent.ExecutionException
@@ -279,6 +284,295 @@ class GroupCoordinatorIntegrationTest(cluster:
ClusterInstance) {
}
}
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ )
+ )
+ def
testCoordinatorFailoverAfterCompactingPartitionWithUpgradedConsumerGroupAndTombstoneRemoved():
Unit = {
+ withAdmin { admin =>
+ TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Create a classic group with one member and commit offsets.
+ withConsumer(groupId = "grp4", groupProtocol = GroupProtocol.CLASSIC,
enableAutoCommit = false) { consumer =>
+ consumer.subscribe(java.util.List.of("foo"))
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ consumer.commitSync()
+ }
+
+ // Set delete.retention.ms=0 before the tombstone is written so that
+ // compaction will remove it.
+ configureDeleteRetention()
+
+ // Upgrade the group to the consumer protocol. Don't commit offsets
+ // so the classic group's offset commits survive compaction.
+ withConsumer(groupId = "grp4", groupProtocol = GroupProtocol.CONSUMER,
enableAutoCommit = false) { consumer =>
+ consumer.subscribe(java.util.List.of("foo"))
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+ }
+
+ // Force compaction twice to remove tombstones: the first pass sets
+ // deleteHorizonMs, and the second pass removes them.
+ rollAndCompactConsumerOffsets()
+ writeOneOffsetCommit()
+ rollAndCompactConsumerOffsets()
+
+ // Restart the broker to reload the group coordinator.
+ cluster.shutdownBroker(0)
+ cluster.startBroker(0)
+
+ // Verify the state of the groups to ensure that the group coordinator
+ // was correctly loaded. Without the fix for KAFKA-20254, the offset
+ // commit records create a simple classic group during replay and the
+ // consumer group records fail to load.
+ withAdmin { admin =>
+ val groups = admin
+ .describeConsumerGroups(java.util.List.of("grp4"))
+ .describedGroups()
+ .asScala
+ .toMap
+
+ assertDescribedGroup(groups, "grp4", GroupType.CONSUMER,
ConsumerGroupState.EMPTY)
+ }
+ }
+
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ )
+ )
+ def
testCoordinatorFailoverAfterCompactingPartitionWithUpgradedSimpleConsumerGroup():
Unit = {
+ withAdmin { admin =>
+ TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Create a simple classic group by committing offsets directly
+ // without subscribing. This only writes offset commit records
+ // without any GroupMetadata records.
+ withConsumer(groupId = "grp6", groupProtocol = GroupProtocol.CLASSIC,
enableAutoCommit = false) { consumer =>
+ val tp = new TopicPartition("foo", 0)
+ consumer.assign(java.util.List.of(tp))
+ consumer.commitSync(java.util.Map.of(tp, new OffsetAndMetadata(0)))
+ }
+
+ // Upgrade the group to the consumer protocol. Don't commit offsets
+ // so the simple classic group's offset commits survive compaction.
+ withConsumer(groupId = "grp6", groupProtocol = GroupProtocol.CONSUMER,
enableAutoCommit = false) { consumer =>
+ consumer.subscribe(java.util.List.of("foo"))
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+ }
+
+ // Force a compaction. Since a simple classic group has no
+ // GroupMetadata records, there are no tombstones — the offset
+ // commit records always survive compaction.
+ rollAndCompactConsumerOffsets()
+
+ // Restart the broker to reload the group coordinator.
+ cluster.shutdownBroker(0)
+ cluster.startBroker(0)
+
+ // Verify the state of the groups to ensure that the group coordinator
+ // was correctly loaded. Without the fix for KAFKA-20254, the offset
+ // commit records create a simple classic group during replay and the
+ // consumer group records fail to load.
+ withAdmin { admin =>
+ val groups = admin
+ .describeConsumerGroups(java.util.List.of("grp6"))
+ .describedGroups()
+ .asScala
+ .toMap
+
+ assertDescribedGroup(groups, "grp6", GroupType.CONSUMER,
ConsumerGroupState.EMPTY)
+ }
+ }
+
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ )
+ )
+ def
testCoordinatorFailoverAfterCompactingPartitionWithUpgradedStreamsGroup(): Unit
= {
+ withAdmin { admin =>
+ TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Create a classic group with one member.
+ withConsumer(groupId = "grp5", groupProtocol = GroupProtocol.CLASSIC) {
consumer =>
+ consumer.subscribe(java.util.List.of("foo"))
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+
+ // Upgrade the group to the streams protocol.
+ withStreamsApp(applicationId = "grp5", inputTopic = "foo")
+ }
+
+ // Force a compaction.
+ rollAndCompactConsumerOffsets()
+
+ // Restart the broker to reload the group coordinator.
+ cluster.shutdownBroker(0)
+ cluster.startBroker(0)
+
+ // Verify the state of the groups to ensure that the group coordinator
+ // was correctly loaded. If replaying any of the records fails, the
+ // group coordinator won't be available.
+ withAdmin { admin =>
+ val groups = admin
+ .describeStreamsGroups(java.util.List.of("grp5"))
+ .describedGroups()
+ .asScala
+ .toMap
+
+ val group = groups("grp5").get(10, TimeUnit.SECONDS)
+ assertEquals("grp5", group.groupId)
+ assertEquals(GroupState.EMPTY, group.groupState)
+ }
+ }
+
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ )
+ )
+ def
testCoordinatorFailoverAfterCompactingPartitionWithUpgradedStreamsGroupAndTombstoneRemoved():
Unit = {
+ withAdmin { admin =>
+ TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Create a classic group with one member and commit offsets.
+ withConsumer(groupId = "grp5", groupProtocol = GroupProtocol.CLASSIC,
enableAutoCommit = false) { consumer =>
+ consumer.subscribe(java.util.List.of("foo"))
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ consumer.commitSync()
+ }
+
+ // Set delete.retention.ms=0 before the tombstone is written so that
+ // compaction will remove it.
+ configureDeleteRetention()
+
+ // Upgrade the group to the streams protocol.
+ withStreamsApp(applicationId = "grp5", inputTopic = "foo")
+ }
+
+ // Force compaction twice to remove tombstones: the first pass sets
+ // deleteHorizonMs, and the second pass removes them.
+ rollAndCompactConsumerOffsets()
+ writeOneOffsetCommit()
+ rollAndCompactConsumerOffsets()
+
+ // Restart the broker to reload the group coordinator.
+ cluster.shutdownBroker(0)
+ cluster.startBroker(0)
+
+ // Verify the state of the groups to ensure that the group coordinator
+ // was correctly loaded. Without the fix for KAFKA-20254, the offset
+ // commit records create a simple classic group during replay and the
+ // streams group records fail to load.
+ withAdmin { admin =>
+ val groups = admin
+ .describeStreamsGroups(java.util.List.of("grp5"))
+ .describedGroups()
+ .asScala
+ .toMap
+
+ val group = groups("grp5").get(10, TimeUnit.SECONDS)
+ assertEquals("grp5", group.groupId)
+ assertEquals(GroupState.EMPTY, group.groupState)
+ }
+ }
+
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ )
+ )
+ def
testCoordinatorFailoverAfterCompactingPartitionWithUpgradedSimpleStreamsGroup():
Unit = {
+ withAdmin { admin =>
+ TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Create a simple classic group by committing offsets directly
+ // without subscribing. This only writes offset commit records
+ // without any GroupMetadata records.
+ withConsumer(groupId = "grp7", groupProtocol = GroupProtocol.CLASSIC,
enableAutoCommit = false) { consumer =>
+ val tp = new TopicPartition("foo", 0)
+ consumer.assign(java.util.List.of(tp))
+ consumer.commitSync(java.util.Map.of(tp, new OffsetAndMetadata(0)))
+ }
+
+ // Upgrade the group to the streams protocol.
+ withStreamsApp(applicationId = "grp7", inputTopic = "foo")
+ }
+
+ // Force a compaction. Since a simple classic group has no
+ // GroupMetadata records, there are no tombstones — the offset
+ // commit records always survive compaction.
+ rollAndCompactConsumerOffsets()
+
+ // Restart the broker to reload the group coordinator.
+ cluster.shutdownBroker(0)
+ cluster.startBroker(0)
+
+ // Verify the state of the groups to ensure that the group coordinator
+ // was correctly loaded. Without the fix for KAFKA-20254, the offset
+ // commit records create a simple classic group during replay and the
+ // streams group records fail to load.
+ withAdmin { admin =>
+ val groups = admin
+ .describeStreamsGroups(java.util.List.of("grp7"))
+ .describedGroups()
+ .asScala
+ .toMap
+
+ val group = groups("grp7").get(10, TimeUnit.SECONDS)
+ assertEquals("grp7", group.groupId)
+ assertEquals(GroupState.EMPTY, group.groupState)
+ }
+ }
+
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
@@ -331,12 +625,34 @@ class GroupCoordinatorIntegrationTest(cluster:
ClusterInstance) {
}
}
+ private def writeOneOffsetCommit(): Unit = {
+ // Write a single offset commit to create dirty data past the cleaner
+ // checkpoint so the cleaner will re-process previously compacted
+ // segments on the next compaction pass.
+ withConsumer(groupId = "compaction-trigger", groupProtocol =
GroupProtocol.CLASSIC, enableAutoCommit = false) { consumer =>
+ val tp = new TopicPartition("foo", 0)
+ consumer.assign(java.util.List.of(tp))
+ consumer.commitSync(java.util.Map.of(tp, new OffsetAndMetadata(0)))
+ }
+ }
+
+ private def configureDeleteRetention(): Unit = {
+ withAdmin { admin =>
+ val resource = new ConfigResource(ConfigResource.Type.TOPIC,
Topic.GROUP_METADATA_TOPIC_NAME)
+ admin.incrementalAlterConfigs(java.util.Map.of(resource,
java.util.List.of(
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.DELETE_RETENTION_MS_CONFIG, "0"),
AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.0"),
AlterConfigOp.OpType.SET)
+ ))).all().get()
+ }
+ }
+
private def rollAndCompactConsumerOffsets(): Unit = {
- val tp = new TopicPartition("__consumer_offsets", 0)
+ val tp = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
val broker = cluster.brokers.asScala.head._2
val log = broker.logManager.getLog(tp).get
+ val endOffset = log.logEndOffset
log.roll()
- assertTrue(broker.logManager.cleaner.awaitCleaned(tp, 0, 60000L))
+ assertTrue(broker.logManager.cleaner.awaitCleaned(tp, endOffset, 60000L))
}
private def withAdmin(f: Admin => Unit): Unit = {
@@ -366,6 +682,35 @@ class GroupCoordinatorIntegrationTest(cluster:
ClusterInstance) {
}
}
+ private def withStreamsApp(
+ applicationId: String,
+ inputTopic: String
+ ): Unit = {
+ val builder = new StreamsBuilder()
+ builder.stream(inputTopic)
+
+ val props = new Properties()
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
cluster.bootstrapServers())
+ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
classOf[Serdes.StringSerde].getName)
+ props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
classOf[Serdes.StringSerde].getName)
+ props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
StreamsGroupProtocol.STREAMS.name())
+ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "500")
+ props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "100")
+
+ val streams = new KafkaStreams(builder.build(), props)
+ try {
+ streams.start()
+ TestUtils.waitUntilTrue(
+ () => streams.state() == KafkaStreams.State.RUNNING,
+ msg = "Streams app did not reach RUNNING state"
+ )
+ } finally {
+ streams.close(Duration.ofSeconds(30))
+ streams.cleanUp()
+ }
+ }
+
private def assertDescribedGroup(
groups: Map[String, KafkaFuture[ConsumerGroupDescription]],
groupId: String,