This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 44989983e4d MINOR: Add integration test for offset deletion on topic
deletion (#21252)
44989983e4d is described below
commit 44989983e4d200adab7cbd9c2736d4e02804fa82
Author: David Jacot <[email protected]>
AuthorDate: Tue Jan 6 19:29:59 2026 +0100
MINOR: Add integration test for offset deletion on topic deletion (#21252)
This patch adds an integration test to verify that committed offsets are
deleted when a topic is deleted.
Reviewers: Sean Quah <[email protected]>, Dongnuo Lyu
<[email protected]>, Lianet Magrans <[email protected]>
---
.../unit/kafka/server/OffsetFetchRequestTest.scala | 87 ++++++++++++++++++++++
1 file changed, 87 insertions(+)
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 75bf82ef155..f838bb42d9e 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -16,6 +16,7 @@
*/
package kafka.server
+import kafka.utils.TestUtils
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest,
ClusterTestDefaults, Type}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.message.{OffsetFetchRequestData,
OffsetFetchResponseData}
@@ -648,6 +649,92 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
}
}
+ @ClusterTest
+ def testCommittedOffsetsDeletedWhenTopicDeleted(): Unit = {
+ // This test verifies that committed offsets are deleted when a topic is
deleted.
+ // When a topic is deleted, GroupCoordinatorService.onMetadataUpdate is
called which
+ // schedules the deletion of all committed offsets for the deleted topic
partitions.
+
+ createOffsetsTopic()
+
+ // Create the topic.
+ val topicId = createTopic(
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Join the consumer group. Note that we don't heartbeat here so we must
use
+ // a session long enough for the duration of the test.
+ val (memberId, memberEpoch) = joinConsumerGroup("grp", useNewProtocol =
true)
+
+ // Commit offsets.
+ for (partitionId <- 0 to 2) {
+ commitOffset(
+ groupId = "grp",
+ memberId = memberId,
+ memberEpoch = memberEpoch,
+ topic = "foo",
+ topicId = topicId,
+ partition = partitionId,
+ offset = 100L + partitionId,
+ expectedError = Errors.NONE,
+ version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
+ )
+ }
+
+ // Verify that the offsets are committed.
+ assertEquals(
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("grp")
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(1)
+ .setCommittedOffset(101L),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(2)
+ .setCommittedOffset(102L)
+ ).asJava)
+ ).asJava),
+ fetchOffsets(
+ group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("grp")
+ .setMemberId(memberId)
+ .setMemberEpoch(memberEpoch)
+ .setTopics(null),
+ requireStable = false,
+ version = 9
+ )
+ )
+
+ // Delete the topic.
+ deleteTopic("foo")
+
+ // Wait until the offsets are deleted. The offsets should be deleted
+ // when the topic deletion is processed by onMetadataUpdate.
+ TestUtils.waitUntilTrue(
+ () => {
+ fetchOffsets(
+ group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("grp")
+ .setMemberId("")
+ .setMemberEpoch(-1)
+ .setTopics(null),
+ requireStable = false,
+ // Force using version 9 (and topic names) because unknown topic ids
+ // are filtered out.
+ version = 9
+ ).topics.isEmpty
+ },
+ msg = "Offsets should be deleted after topic deletion."
+ )
+ }
+
@ClusterTest
def testGroupErrors(): Unit = {
val topicId = createTopic(