This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 44989983e4d MINOR: Add integration test for offset deletion on topic 
deletion (#21252)
44989983e4d is described below

commit 44989983e4d200adab7cbd9c2736d4e02804fa82
Author: David Jacot <[email protected]>
AuthorDate: Tue Jan 6 19:29:59 2026 +0100

    MINOR: Add integration test for offset deletion on topic deletion (#21252)
    
    This patch adds an integration test to verify that committed offsets are
    deleted when a topic is deleted.
    
    Reviewers: Sean Quah <[email protected]>, Dongnuo Lyu
     <[email protected]>, Lianet Magrans <[email protected]>
---
 .../unit/kafka/server/OffsetFetchRequestTest.scala | 87 ++++++++++++++++++++++
 1 file changed, 87 insertions(+)

diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 75bf82ef155..f838bb42d9e 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -16,6 +16,7 @@
  */
 package kafka.server
 
+import kafka.utils.TestUtils
 import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.message.{OffsetFetchRequestData, 
OffsetFetchResponseData}
@@ -648,6 +649,92 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
     }
   }
 
+  @ClusterTest
+  def testCommittedOffsetsDeletedWhenTopicDeleted(): Unit = {
+    // This test verifies that committed offsets are deleted when a topic is 
deleted.
+    // When a topic is deleted, GroupCoordinatorService.onMetadataUpdate is 
called which
+    // schedules the deletion of all committed offsets for the deleted topic 
partitions.
+
+    createOffsetsTopic()
+
+    // Create the topic.
+    val topicId = createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    // Join the consumer group. Note that we don't heartbeat here so we must 
use
+    // a session long enough for the duration of the test.
+    val (memberId, memberEpoch) = joinConsumerGroup("grp", useNewProtocol = 
true)
+
+    // Commit offsets.
+    for (partitionId <- 0 to 2) {
+      commitOffset(
+        groupId = "grp",
+        memberId = memberId,
+        memberEpoch = memberEpoch,
+        topic = "foo",
+        topicId = topicId,
+        partition = partitionId,
+        offset = 100L + partitionId,
+        expectedError = Errors.NONE,
+        version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
+      )
+    }
+
+    // Verify that the offsets are committed.
+    assertEquals(
+      new OffsetFetchResponseData.OffsetFetchResponseGroup()
+        .setGroupId("grp")
+        .setTopics(List(
+          new OffsetFetchResponseData.OffsetFetchResponseTopics()
+            .setName("foo")
+            .setPartitions(List(
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(0)
+                .setCommittedOffset(100L),
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(1)
+                .setCommittedOffset(101L),
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(2)
+                .setCommittedOffset(102L)
+            ).asJava)
+        ).asJava),
+      fetchOffsets(
+        group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
+          .setGroupId("grp")
+          .setMemberId(memberId)
+          .setMemberEpoch(memberEpoch)
+          .setTopics(null),
+        requireStable = false,
+        version = 9
+      )
+    )
+
+    // Delete the topic.
+    deleteTopic("foo")
+
+    // Wait until the offsets are deleted. The offsets should be deleted
+    // when the topic deletion is processed by onMetadataUpdate.
+    TestUtils.waitUntilTrue(
+      () => {
+        fetchOffsets(
+          group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
+            .setGroupId("grp")
+            .setMemberId("")
+            .setMemberEpoch(-1)
+            .setTopics(null),
+          requireStable = false,
+          // Force using version 9 (and topic names) because unknown topic ids
+          // are filtered out.
+          version = 9
+        ).topics.isEmpty
+      },
+      msg = "Offsets should be deleted after topic deletion."
+    )
+  }
+
   @ClusterTest
   def testGroupErrors(): Unit = {
     val topicId = createTopic(

Reply via email to