This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 5235e11d4d5 KAFKA-18809 Set min in sync replicas for __share_group_state. (#18922) 5235e11d4d5 is described below commit 5235e11d4d54d3b06daac434aa9c8a631a73e3e3 Author: Sushant Mahajan <smaha...@confluent.io> AuthorDate: Mon Feb 17 01:52:48 2025 +0530 KAFKA-18809 Set min in sync replicas for __share_group_state. (#18922) - The share.coordinator.state.topic.min.isr config defined in ShareCoordinatorConfig was not being used in the AutoTopicCreationManager. - The AutoTopicCreationManager calls the ShareCoordinatorService.shareGroupStateTopicConfigs to configs for the topic to create. - The method ShareCoordinatorService.shareGroupStateTopicConfigs was not setting the supplied config value for share.coordinator.state.topic.min.isr to min.insync.replicas. - In this PR, we remedy the situation by setting the value - A test has been added to ShareCoordinatorServiceTest so that this is not repeated for any configs. Reviewers: poorv Mittal <apoorvmitta...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- .../coordinator/share/ShareCoordinatorService.java | 1 + .../share/ShareCoordinatorServiceTest.java | 32 ++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java index 3139e2ef07d..18b7f0bf051 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java @@ -235,6 +235,7 @@ public class ShareCoordinatorService implements ShareCoordinator { properties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); // as defined in KIP-932 properties.put(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.PRODUCER.name); properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, config.shareCoordinatorStateTopicSegmentBytes()); + properties.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, config.shareCoordinatorStateTopicMinIsr()); return properties; } diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java index 331595119b6..8738db4dd9a 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; @@ -55,6 +56,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -1643,6 +1645,36 @@ class ShareCoordinatorServiceTest { service.shutdown(); } + @Test + public void testShareStateTopicConfigs() { + CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + MockTime time = new MockTime(); + MockTimer timer = new MockTimer(time); + PartitionWriter writer = mock(PartitionWriter.class); + + Metrics metrics = new Metrics(); + ShareCoordinatorService service = spy(new ShareCoordinatorService( + new LogContext(), + ShareCoordinatorTestConfig.testConfig(), + runtime, + new ShareCoordinatorMetrics(metrics), + time, + timer, + writer + )); + + List<String> propNames = List.of( + TopicConfig.CLEANUP_POLICY_CONFIG, + TopicConfig.COMPRESSION_TYPE_CONFIG, + TopicConfig.SEGMENT_BYTES_CONFIG, + TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG + ); + Properties actual = service.shareGroupStateTopicConfigs(); + propNames.forEach(actual::contains); + + service.shutdown(); + } + private void checkMetrics(Metrics metrics) { Set<MetricName> usualMetrics = new HashSet<>(Arrays.asList( metrics.metricName("write-latency-avg", ShareCoordinatorMetrics.METRICS_GROUP),