This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 56a6ba2d2e5 MINOR: Add retention prop to share group state topic.
(#20013)
56a6ba2d2e5 is described below
commit 56a6ba2d2e5b9b109782eaa39184a9eedf684d26
Author: Sushant Mahajan <[email protected]>
AuthorDate: Mon Jun 23 02:32:35 2025 +0530
MINOR: Add retention prop to share group state topic. (#20013)
*
https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
states the `retention.ms` property for the `__share_group_state` to be
`-1`.
* This PR makes it explicit when defining the values of those configs.
* Existing test has been updated.
```
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
--topic __share_group_state
Topic: __share_group_state TopicId: XCwzZjEGSjm5lUc_BeCrqA
PartitionCount: 50 ReplicationFactor: 1
Configs:
compression.type=producer,
min.insync.replicas=1,
cleanup.policy=delete,
segment.bytes=104857600,
retention.ms=-1
...
```
Reviewers: Andrew Schofield <[email protected]>
---
.../org/apache/kafka/coordinator/share/ShareCoordinatorService.java | 4 +++-
.../apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java | 6 ++++--
2 files changed, 7 insertions(+), 3 deletions(-)
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index e26aac124ff..44315418c66 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -253,10 +253,12 @@ public class ShareCoordinatorService implements
ShareCoordinator {
@Override
public Properties shareGroupStateTopicConfigs() {
Properties properties = new Properties();
- properties.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_DELETE); // as defined in KIP-932
+ // As defined in KIP-932.
+ properties.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_DELETE);
properties.put(TopicConfig.COMPRESSION_TYPE_CONFIG,
BrokerCompressionType.PRODUCER.name);
properties.put(TopicConfig.SEGMENT_BYTES_CONFIG,
config.shareCoordinatorStateTopicSegmentBytes());
properties.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
config.shareCoordinatorStateTopicMinIsr());
+ properties.put(TopicConfig.RETENTION_MS_CONFIG, -1);
return properties;
}
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 3445e241eeb..f7c6e32ceb3 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -2158,10 +2158,12 @@ class ShareCoordinatorServiceTest {
TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.COMPRESSION_TYPE_CONFIG,
TopicConfig.SEGMENT_BYTES_CONFIG,
- TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG
+ TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
+ TopicConfig.RETENTION_MS_CONFIG
);
Properties actual = service.shareGroupStateTopicConfigs();
- propNames.forEach(actual::contains);
+ propNames.forEach(actual::remove);
+ assertTrue(actual.isEmpty());
service.shutdown();
}