This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 56a6ba2d2e5 MINOR: Add retention prop to share group state topic. 
(#20013)
56a6ba2d2e5 is described below

commit 56a6ba2d2e5b9b109782eaa39184a9eedf684d26
Author: Sushant Mahajan <[email protected]>
AuthorDate: Mon Jun 23 02:32:35 2025 +0530

    MINOR: Add retention prop to share group state topic. (#20013)
    
    *
    
    
https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
    states the `retention.ms` property for the `__share_group_state` to be
    `-1`.
    * This PR makes it explicit when defining the values of those configs.
    * Existing test has been updated.
    
    ```
    $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
    --topic __share_group_state
    
    Topic: __share_group_state      TopicId: XCwzZjEGSjm5lUc_BeCrqA
    PartitionCount: 50      ReplicationFactor: 1
    Configs:
    compression.type=producer,
    min.insync.replicas=1,
    cleanup.policy=delete,
    segment.bytes=104857600,
    retention.ms=-1
    ...
    ```
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../org/apache/kafka/coordinator/share/ShareCoordinatorService.java | 4 +++-
 .../apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java | 6 ++++--
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index e26aac124ff..44315418c66 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -253,10 +253,12 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
     @Override
     public Properties shareGroupStateTopicConfigs() {
         Properties properties = new Properties();
-        properties.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE); // as defined in KIP-932
+        // As defined in KIP-932.
+        properties.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE);
         properties.put(TopicConfig.COMPRESSION_TYPE_CONFIG, 
BrokerCompressionType.PRODUCER.name);
         properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, 
config.shareCoordinatorStateTopicSegmentBytes());
         properties.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, 
config.shareCoordinatorStateTopicMinIsr());
+        properties.put(TopicConfig.RETENTION_MS_CONFIG, -1);
         return properties;
     }
 
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 3445e241eeb..f7c6e32ceb3 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -2158,10 +2158,12 @@ class ShareCoordinatorServiceTest {
             TopicConfig.CLEANUP_POLICY_CONFIG,
             TopicConfig.COMPRESSION_TYPE_CONFIG,
             TopicConfig.SEGMENT_BYTES_CONFIG,
-            TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG
+            TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
+            TopicConfig.RETENTION_MS_CONFIG
         );
         Properties actual = service.shareGroupStateTopicConfigs();
-        propNames.forEach(actual::contains);
+        propNames.forEach(actual::remove);
+        assertTrue(actual.isEmpty());
 
         service.shutdown();
     }

Reply via email to