This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 288b4799eea KAFKA-19655: Align the behavior of num.partitions and 
default.replication.factor for topic creation (#21550)
288b4799eea is described below

commit 288b4799eeac959e9f5b8691cdb4adb25b23d1c3
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed Mar 4 07:43:36 2026 +0800

    KAFKA-19655: Align the behavior of num.partitions and 
default.replication.factor for topic creation (#21550)
    
    Currently, 'num.partitions' and 'default.replication.factor' are applied
    inconsistently. Topic auto-creation relies on Broker configs, while
    Streams and AdminClient rely on Controller configs. This leads to
    confusing behavior where a Broker and Controller might   have diverging
    defaults.
    
    This commit implements the 4.x transition phase:
    - Updated DefaultAutoTopicCreationManager to check if these configs are
      explicitly set in 'broker.properties'.
    - If NOT explicitly set, the Broker now sends 'NO_NUM_PARTITIONS' and
      'NO_REPLICATION_FACTOR' (-1) in the CreateTopicsRequest, allowing
      the Controller's configuration to take precedence.
    - Added deprecation warnings in KafkaConfig when these properties are
      defined in a Broker role, notifying users to migrate them to the
      Controller role before 5.0.
    - Updated documentation to clarify the precedence logic between
      Broker and Controller nodes.
    
    Reviewers: Luke Chen <[email protected]>
---
 .../kafka/clients/admin/AutoTopicCreationTest.java | 112 +++++++++++++++++++++
 .../kafka/server/AutoTopicCreationManager.scala    |  12 ++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |   8 ++
 .../server/AutoTopicCreationManagerTest.scala      |  12 +--
 .../kafka/server/config/ServerLogConfigs.java      |  11 +-
 .../kafka/server/config/ReplicationConfigs.java    |  12 ++-
 6 files changed, 156 insertions(+), 11 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AutoTopicCreationTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AutoTopicCreationTest.java
new file mode 100644
index 00000000000..9417de1d07a
--- /dev/null
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AutoTopicCreationTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ClusterTestDefaults(types = {Type.KRAFT}, brokers = 2)
+public class AutoTopicCreationTest {
+
+    @ClusterTest(
+        serverProperties = {
+            @ClusterConfigProperty(id = 0, key = "num.partitions", value = 
"5"),
+            @ClusterConfigProperty(id = 0, key = "default.replication.factor", 
value = "2"),
+            @ClusterConfigProperty(id = 1, key = "num.partitions", value = 
"5"),
+            @ClusterConfigProperty(id = 1, key = "default.replication.factor", 
value = "2"),
+        }
+    )
+    public void testAutoCreateTopicWithExplicitBrokerConfig(ClusterInstance 
cluster) throws Exception {
+        String topic = "explicit-broker-topic";
+        triggerAutoCreateTopic(cluster, topic);
+        try (Admin admin = cluster.admin()) {
+            TopicDescription desc = 
admin.describeTopics(List.of(topic)).allTopicNames().get().get(topic);
+            assertEquals(5, desc.partitions().size(),
+                "num.partitions explicitly set on broker should be used");
+            assertEquals(2, desc.partitions().get(0).replicas().size(),
+                "default.replication.factor explicitly set on broker should be 
used");
+        }
+    }
+
+    @ClusterTest(
+        serverProperties = {
+            @ClusterConfigProperty(id = 3000, key = "num.partitions", value = 
"5"),
+            @ClusterConfigProperty(id = 3000, key = 
"default.replication.factor", value = "2"),
+        }
+    )
+    public void testAutoCreateTopicWithImplicitBrokerConfig(ClusterInstance 
cluster) throws Exception {
+        String topic = "implicit-broker-topic";
+        triggerAutoCreateTopic(cluster, topic);
+        try (Admin admin = cluster.admin()) {
+            TopicDescription desc = 
admin.describeTopics(List.of(topic)).allTopicNames().get().get(topic);
+            assertEquals(5, desc.partitions().size(),
+                "Controller num.partitions should be used when broker does not 
explicitly set it");
+            assertEquals(2, desc.partitions().get(0).replicas().size(),
+                "Controller default.replication.factor should be used when 
broker does not explicitly set it");
+        }
+    }
+
+    @ClusterTest(
+        serverProperties = {
+            @ClusterConfigProperty(id = 0, key = "num.partitions", value = 
"5"),
+            @ClusterConfigProperty(id = 1, key = "num.partitions", value = 
"5"),
+            @ClusterConfigProperty(id = 3000, key = 
"default.replication.factor", value = "2"),
+        }
+    )
+    public void testAutoCreateTopicWithMixedConfig(ClusterInstance cluster) 
throws Exception {
+        String topic = "mixed-config-topic";
+        triggerAutoCreateTopic(cluster, topic);
+        try (Admin admin = cluster.admin()) {
+            TopicDescription desc = 
admin.describeTopics(List.of(topic)).allTopicNames().get().get(topic);
+            assertEquals(5, desc.partitions().size(),
+                "num.partitions explicitly set on broker should be used");
+            assertEquals(2, desc.partitions().get(0).replicas().size(),
+                "Controller default.replication.factor should be used when 
broker does not set it");
+        }
+    }
+
+    @ClusterTest
+    public void testAutoCreateTopicWithDefaultConfig(ClusterInstance cluster) 
throws Exception {
+        String topic = "default-config-topic";
+        triggerAutoCreateTopic(cluster, topic);
+        try (Admin admin = cluster.admin()) {
+            TopicDescription desc = 
admin.describeTopics(List.of(topic)).allTopicNames().get().get(topic);
+            assertEquals(1, desc.partitions().size(),
+                "Default num.partitions of 1 should be used when neither 
broker nor controller sets it");
+            assertEquals(1, desc.partitions().get(0).replicas().size(),
+                "Default default.replication.factor of 1 should be used when 
neither broker nor controller sets it");
+        }
+    }
+
+    private void triggerAutoCreateTopic(ClusterInstance cluster, String topic) 
throws Exception {
+        // Sends a produce request to a non-existent topic so that auto topic 
creation is triggered.
+        try (Producer<byte[], byte[]> producer = cluster.producer()) {
+            ProducerRecord<byte[], byte[]> record =
+                new ProducerRecord<>(topic, null, "key".getBytes(), 
"value".getBytes());
+            producer.send(record).get();
+        }
+    }
+}
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala 
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index 6f2a192a438..2d81c3694bf 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.requests.{CreateTopicsRequest, 
CreateTopicsRespon
 import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.coordinator.share.ShareCoordinator
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
+import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
 import org.apache.kafka.server.quota.ControllerMutationQuota
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.server.TopicCreator
@@ -270,10 +271,17 @@ class DefaultAutoTopicCreationManager(
           
.setReplicationFactor(config.shareCoordinatorConfig.shareCoordinatorStateTopicReplicationFactor())
           
.setConfigs(convertToTopicConfigCollections(shareCoordinator.shareGroupStateTopicConfigs()))
       case topicName =>
+        val numPartitions: java.lang.Integer =
+          if 
(config.originals.containsKey(ServerLogConfigs.NUM_PARTITIONS_CONFIG)) 
config.numPartitions
+          else CreateTopicsRequest.NO_NUM_PARTITIONS
+        val replicationFactor: java.lang.Short =
+          if 
(config.originals.containsKey(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG))
 config.defaultReplicationFactor.toShort
+          else CreateTopicsRequest.NO_REPLICATION_FACTOR
+
         new CreatableTopic()
           .setName(topicName)
-          .setNumPartitions(config.numPartitions)
-          .setReplicationFactor(config.defaultReplicationFactor.shortValue)
+          .setNumPartitions(numPartitions)
+          .setReplicationFactor(replicationFactor)
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c7beff0504b..0e1a3678227 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -584,6 +584,14 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
       // warn if create.topic.policy.class.name or 
alter.config.policy.class.name is defined in the broker role
       warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole, 
ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG)
       warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole, 
ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG)
+      if (originals.containsKey(ServerLogConfigs.NUM_PARTITIONS_CONFIG)) {
+        warn(s"${ServerLogConfigs.NUM_PARTITIONS_CONFIG} is defined in the 
broker role. This configuration will be ignored in 5.0. " +
+          s"Please set ${ServerLogConfigs.NUM_PARTITIONS_CONFIG} in the 
controller role instead.")
+      }
+      if 
(originals.containsKey(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG)) {
+        warn(s"${ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG} is 
defined in the broker role. This configuration will be ignored in 5.0. " +
+          s"Please set ${ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG} 
in the controller role instead.")
+      }
     } else if (processRoles == Set(ProcessRole.ControllerRole)) {
       // KRaft controller-only
       validateQuorumVotersAndQuorumBootstrapServerForKRaft()
diff --git 
a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index 5deaa94306b..5b436c627b1 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -125,13 +125,13 @@ class AutoTopicCreationManagerTest {
     val props = TestUtils.createBrokerConfig(1)
     props.setProperty(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG, 
requestTimeout.toString)
 
-    
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
 internalTopicPartitions.toString)
-    
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 internalTopicPartitions.toString)
-    
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_REPLICATION_FACTOR_CONFIG 
, internalTopicPartitions.toString)
+    
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
 internalTopicReplicationFactor.toString)
+    
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 internalTopicReplicationFactor.toString)
+    
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_REPLICATION_FACTOR_CONFIG 
, internalTopicReplicationFactor.toString)
 
-    props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 
internalTopicReplicationFactor.toString)
-    
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
internalTopicReplicationFactor.toString)
-    
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_NUM_PARTITIONS_CONFIG, 
internalTopicReplicationFactor.toString)
+    props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 
internalTopicPartitions.toString)
+    
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
internalTopicPartitions.toString)
+    
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_NUM_PARTITIONS_CONFIG, 
internalTopicPartitions.toString)
 
     config = KafkaConfig.fromProps(props)
     val aliveBrokers = util.List.of(new Node(0, "host0", 0), new Node(1, 
"host1", 1))
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
 
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
index dbfe8b13eed..d8ffd8a5e2f 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
@@ -34,7 +34,16 @@ import static 
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFI
 public class ServerLogConfigs {
     public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
     public static final int NUM_PARTITIONS_DEFAULT = 1;
-    public static final String NUM_PARTITIONS_DOC = "The default number of log 
partitions per topic";
+    public static final String NUM_PARTITIONS_DOC =
+        "The default number of log partitions per topic. This configuration 
affects the following paths:"
+        + "<ul>"
+        + "  <li>1. Auto topic creation</li>"
+        + "  <li>2. Internal streams topic creation</li>"
+        + "  <li>3. Topic creation via <code>AdminClient#createTopics</code> 
when the number of partition is set to -1</li>"
+        + "</ul>"
+        + "<p>For (1), the value from the broker configuration is used only 
when it is explicitly set. "
+        + "If it is not explicitly configured on the broker, the value from 
the controller configuration is used.<br/>"
+        + "For (2) and (3), the value from the controller configuration is 
always used.</p>";
 
     public static final String LOG_DIRS_CONFIG = LOG_PREFIX + "dirs";
     public static final String LOG_DIR_CONFIG = LOG_PREFIX + "dir";
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java 
b/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java
index d39a141a458..664eae9448e 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java
@@ -40,8 +40,16 @@ public class ReplicationConfigs {
 
     public static final String DEFAULT_REPLICATION_FACTOR_CONFIG = 
"default.replication.factor";
     public static final int REPLICATION_FACTOR_DEFAULT = 1;
-    public static final String DEFAULT_REPLICATION_FACTOR_DOC = "The 
replication factor for automatically created topics," +
-            " and for topics created with -1 as the replication factor";
+    public static final String DEFAULT_REPLICATION_FACTOR_DOC =
+        "The default replication factor per topic. This configuration affects 
the following paths:"
+        + "<ul>"
+        + "  <li>1. Auto topic creation</li>"
+        + "  <li>2. Internal streams topic creation</li>"
+        + "  <li>3. Topic creation via <code>AdminClient#createTopics</code> 
when the replication factor is set to -1</li>"
+        + "</ul>"
+        + "<p>For (1), the value from the broker configuration is used only 
when it is explicitly set. "
+        + "If it is not explicitly configured on the broker, the value from 
the controller configuration is used.<br/>"
+        + "For (2) and (3), the value from the controller configuration is 
always used.</p>";
 
     public static final String REPLICA_LAG_TIME_MAX_MS_CONFIG = 
"replica.lag.time.max.ms";
     public static final long REPLICA_LAG_TIME_MAX_MS_DEFAULT = 30000L;

Reply via email to