This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5235e11d4d5 KAFKA-18809 Set min in sync replicas for 
__share_group_state. (#18922)
5235e11d4d5 is described below

commit 5235e11d4d54d3b06daac434aa9c8a631a73e3e3
Author: Sushant Mahajan <smaha...@confluent.io>
AuthorDate: Mon Feb 17 01:52:48 2025 +0530

    KAFKA-18809 Set min in sync replicas for __share_group_state. (#18922)
    
    - The share.coordinator.state.topic.min.isr config defined in 
ShareCoordinatorConfig was not being used in the AutoTopicCreationManager.
    - The AutoTopicCreationManager calls the 
ShareCoordinatorService.shareGroupStateTopicConfigs to configs for the topic to 
create.
    - The method ShareCoordinatorService.shareGroupStateTopicConfigs was not 
setting the supplied config value for share.coordinator.state.topic.min.isr to 
min.insync.replicas.
    - In this PR, we remedy the situation by setting the value
    - A test has been added to ShareCoordinatorServiceTest so that this is not 
repeated for any configs.
    
    Reviewers: poorv Mittal <apoorvmitta...@gmail.com>, Chia-Ping Tsai 
<chia7...@gmail.com>
---
 .../coordinator/share/ShareCoordinatorService.java |  1 +
 .../share/ShareCoordinatorServiceTest.java         | 32 ++++++++++++++++++++++
 2 files changed, 33 insertions(+)

diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index 3139e2ef07d..18b7f0bf051 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -235,6 +235,7 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
         properties.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE); // as defined in KIP-932
         properties.put(TopicConfig.COMPRESSION_TYPE_CONFIG, 
BrokerCompressionType.PRODUCER.name);
         properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, 
config.shareCoordinatorStateTopicSegmentBytes());
+        properties.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, 
config.shareCoordinatorStateTopicMinIsr());
         return properties;
     }
 
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 331595119b6..8738db4dd9a 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
@@ -55,6 +56,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -1643,6 +1645,36 @@ class ShareCoordinatorServiceTest {
         service.shutdown();
     }
 
+    @Test
+    public void testShareStateTopicConfigs() {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        MockTime time = new MockTime();
+        MockTimer timer = new MockTimer(time);
+        PartitionWriter writer = mock(PartitionWriter.class);
+
+        Metrics metrics = new Metrics();
+        ShareCoordinatorService service = spy(new ShareCoordinatorService(
+            new LogContext(),
+            ShareCoordinatorTestConfig.testConfig(),
+            runtime,
+            new ShareCoordinatorMetrics(metrics),
+            time,
+            timer,
+            writer
+        ));
+
+        List<String> propNames = List.of(
+            TopicConfig.CLEANUP_POLICY_CONFIG,
+            TopicConfig.COMPRESSION_TYPE_CONFIG,
+            TopicConfig.SEGMENT_BYTES_CONFIG,
+            TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG
+        );
+        Properties actual = service.shareGroupStateTopicConfigs();
+        propNames.forEach(actual::contains);
+
+        service.shutdown();
+    }
+
     private void checkMetrics(Metrics metrics) {
         Set<MetricName> usualMetrics = new HashSet<>(Arrays.asList(
             metrics.metricName("write-latency-avg", 
ShareCoordinatorMetrics.METRICS_GROUP),

Reply via email to