This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5e33fe783b3 KAFKA-19740 Deprecate 
group.coordinator.rebalance.protocols config (KIP-1237) (#21522)
5e33fe783b3 is described below

commit 5e33fe783b3a18b2be1af1c9e4831809a21e0a77
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Tue Mar 10 16:16:16 2026 +0800

    KAFKA-19740 Deprecate group.coordinator.rebalance.protocols config 
(KIP-1237) (#21522)
    
    Implement [KIP-1237](https://cwiki.apache.org/confluence/x/jIqmFw).
    
    - Deprecate the `group.coordinator.rebalance.protocols` configuration
    for removal in Kafka 5.0.
    - Log a deprecation warning when users disable any default protocol
    (classic, consumer, streams), advising them to remove the configuration
    - Guard warnings with `doLog` to avoid duplicate output during startup
    - Clean up redundant `group.coordinator.rebalance.protocols` settings in
    test files — `"classic,consumer,streams"` is already the default, and
    `share` is a no-op (controlled by `share.version` feature)
    
    Test result:
    
    ```
    # group.coordinator.rebalance.protocols=classic,consumer,streams
    WARN The config `group.coordinator.rebalance.protocols` is deprecated
    and will be removed in Kafka 5.0. Please remove the configuration to
    prepare for the upgrade. (kafka.server.KafkaConfig)
    
    # =classic,consumer
    WARN The config `group.coordinator.rebalance.protocols` is deprecated
    and will be removed in Kafka 5.0. The following protocol(s) are
    currently disabled: streams. In Kafka 5.0, all protocols will always be
    enabled and controlled solely by feature versions (group.version,
    streams.version, share.version) via kafka-features.sh. Please remove the
    configuration, which will restore all protocols to the default enabled
    state, to prepare for the upgrade. (kafka.server.KafkaConfig)
    
    # =classic,streams
    WARN The config `group.coordinator.rebalance.protocols` is deprecated
    and will be removed in Kafka 5.0. The following protocol(s) are
    currently disabled: consumer. In Kafka 5.0, all protocols will always be
    enabled and controlled solely by feature versions (group.version,
    streams.version, share.version) via kafka-features.sh. Please remove the
    configuration, which will restore all protocols to the default enabled
    state, to prepare for the upgrade. (kafka.server.KafkaConfig)
    
    # =classic
    WARN The config `group.coordinator.rebalance.protocols` is deprecated
    and will be removed in Kafka 5.0. The following protocol(s) are
    currently disabled: consumer, streams. In Kafka 5.0, all protocols will
    always be enabled and controlled solely by feature versions
    (group.version, streams.version, share.version) via kafka-features.sh.
    Please remove the configuration, which will restore all protocols to the
    default enabled state, to prepare for the upgrade.
    (kafka.server.KafkaConfig)
    
    # =classic,consumer,streams,share
    WARN 'share' in `group.coordinator.rebalance.protocols` is deprecated.
    Share groups are controlled by the 'share.version' feature. This config
    will be removed in Kafka 5.0. (kafka.server.KafkaConfig)
    WARN The config `group.coordinator.rebalance.protocols` is deprecated
    and will be removed in Kafka 5.0. Please remove the configuration to
    prepare for the upgrade. (kafka.server.KafkaConfig)
    
    # =classic,consumer,share
    WARN 'share' in `group.coordinator.rebalance.protocols` is deprecated.
    Share groups are controlled by the 'share.version' feature. This config
    will be removed in Kafka 5.0. (kafka.server.KafkaConfig)
    WARN The config `group.coordinator.rebalance.protocols` is deprecated
    and will be removed in Kafka 5.0. The following protocol(s) are
    currently disabled: streams. In Kafka 5.0, all protocols will always be
    enabled and controlled solely by feature versions (group.version,
    streams.version, share.version) via kafka-features.sh. Please remove the
    configuration, which will restore all protocols to the default enabled
    state, to prepare for the upgrade. (kafka.server.KafkaConfig)
    
    # =consumer
    ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
    org.apache.kafka.common.config.ConfigException: Disabling the 'classic'
    protocol is not supported.
    ```
    
    Reviewers: Sean Quah <[email protected]>, Chia-Ping Tsai
    <[email protected]>
---
 .../consumer/ShareConsumerRackAwareTest.java       |  1 -
 .../kafka/clients/consumer/ShareConsumerTest.java  |  1 -
 core/src/main/scala/kafka/server/KafkaConfig.scala | 22 ++++++++--
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 49 ++++++++++++++++++++++
 .../server/StreamsGroupHeartbeatRequestTest.scala  | 10 +----
 docs/getting-started/upgrade.md                    |  1 +
 .../coordinator/group/GroupCoordinatorConfig.java  |  8 +++-
 .../test/junit/ClusterTestExtensionsTest.java      |  1 +
 .../org/apache/kafka/tools/GroupsCommandTest.java  |  1 -
 .../tools/streams/DescribeStreamsGroupTest.java    |  3 +-
 .../kafka/tools/streams/ListStreamsGroupTest.java  |  2 -
 11 files changed, 79 insertions(+), 20 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
index 4e60b0e12cc..488f8b75494 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
@@ -49,7 +49,6 @@ public class ShareConsumerRackAwareTest {
             @ClusterConfigProperty(id = 0, key = "broker.rack", value = 
"rack0"),
             @ClusterConfigProperty(id = 1, key = "broker.rack", value = 
"rack1"),
             @ClusterConfigProperty(id = 2, key = "broker.rack", value = 
"rack2"),
-            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic, share"),
             @ClusterConfigProperty(key = 
GroupCoordinatorConfig.SHARE_GROUP_ASSIGNORS_CONFIG, value = 
"org.apache.kafka.clients.consumer.RackAwareAssignor")
         }
     )
diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 8a08a046587..7ca1b5b6e8b 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -2362,7 +2362,6 @@ public class ShareConsumerTest {
         brokers = 1,
         serverProperties = {
             @ClusterConfigProperty(key = "auto.create.topics.enable", value = 
"false"),
-            @ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
             @ClusterConfigProperty(key = "group.share.enable", value = "true"),
             @ClusterConfigProperty(key = 
"group.share.partition.max.record.locks", value = "10000"),
             @ClusterConfigProperty(key = 
"group.share.record.lock.duration.ms", value = "15000"),
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 0e1a3678227..9081eb058cb 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -358,9 +358,25 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     if (!protocols.contains(GroupType.CLASSIC)) {
       throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' 
protocol is not supported.")
     }
-    if (protocols.contains(GroupType.SHARE)) {
-      warn(s"'${GroupType.SHARE}' in 
${GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG} is 
deprecated. " +
-        s"Share groups are controlled by the 'share.version' feature; this 
broker config will be ignored in a future release.")
+    if (doLog && protocols.contains(GroupType.SHARE)) {
+      warn(s"'${GroupType.SHARE}' in 
`${GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG}` is 
deprecated. " +
+        s"Share groups are controlled by the 'share.version' feature. " +
+        s"This config will be removed in Kafka 5.0.")
+    }
+    if (doLog && 
originals().containsKey(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG))
 {
+      val defaultProtocols = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT
+        .asScala.map(_.toUpperCase).map(GroupType.valueOf).toSet
+      val missingProtocols = defaultProtocols -- protocols
+      if (missingProtocols.nonEmpty) {
+        warn(s"The config 
`${GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG}` is 
deprecated and will be removed in Kafka 5.0. " +
+          s"The following protocol(s) are currently disabled: 
${missingProtocols.mkString(", ")}. " +
+          s"In Kafka 5.0, all protocols will always be enabled and controlled 
solely by feature versions " +
+          s"(group.version, streams.version, share.version) via 
kafka-features.sh. " +
+          s"Please remove the configuration, which will restore all protocols 
to the default enabled state, to prepare for the upgrade.")
+      } else {
+        warn(s"The config 
`${GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG}` is 
deprecated and will be removed in Kafka 5.0. " +
+          s"Please remove the configuration to prepare for the upgrade.")
+      }
     }
     protocols
   }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index ead46d36018..e726dbe3e72 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1827,6 +1827,55 @@ class KafkaConfigTest {
     assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE), 
config3.groupCoordinatorRebalanceProtocols)
   }
 
+  @Test
+  def testGroupCoordinatorRebalanceProtocolsDeprecationWarning(): Unit = {
+    val props = new Properties()
+    props.putAll(kraftProps())
+
+    val configName = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG
+    val defaultDeprecationWarning =
+      s"The config `$configName` is deprecated and will be removed in Kafka 
5.0. " +
+      "Please remove the configuration to prepare for the upgrade."
+    def deprecationWarning(disabled: String): String =
+      s"The config `$configName` is deprecated and will be removed in Kafka 
5.0. " +
+      s"The following protocol(s) are currently disabled: $disabled. " +
+      "In Kafka 5.0, all protocols will always be enabled and controlled 
solely by feature versions " +
+      "(group.version, streams.version, share.version) via kafka-features.sh. 
" +
+      "Please remove the configuration, which will restore all protocols to 
the default enabled state, to prepare for the upgrade."
+    val shareDeprecationWarning = s"'share' in `$configName` is deprecated. " +
+      "Share groups are controlled by the 'share.version' feature. " +
+      "This config will be removed in Kafka 5.0."
+
+    Using.resource(LogCaptureAppender.createAndRegister) { appender =>
+      appender.setClassLogger(classOf[KafkaConfig], Level.WARN)
+
+      // Config not set: no warning.
+      KafkaConfig.fromProps(props)
+      assertTrue(appender.getMessages.isEmpty)
+
+      // Explicitly set to default value: simple deprecation warning.
+      props.put(configName, "classic,consumer,streams")
+      KafkaConfig.fromProps(props)
+      assertTrue(appender.getMessages.contains(defaultDeprecationWarning))
+      appender.getMessages.clear()
+
+      // Missing streams.
+      props.put(configName, "classic,consumer")
+      KafkaConfig.fromProps(props)
+      assertTrue(appender.getMessages.contains(deprecationWarning("streams")))
+
+      // Missing consumer.
+      props.put(configName, "classic,streams")
+      KafkaConfig.fromProps(props)
+      assertTrue(appender.getMessages.contains(deprecationWarning("consumer")))
+
+      // Including "share": no-op warning.
+      props.put(configName, "classic,consumer,streams,share")
+      KafkaConfig.fromProps(props)
+      assertTrue(appender.getMessages.contains(shareDeprecationWarning))
+    }
+  }
+
   @Test
   def testConsumerGroupMigrationPolicy(): Unit = {
     val props = new Properties()
diff --git 
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
index 90331fa4993..bb4f2d220d8 100644
--- 
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
@@ -35,17 +35,12 @@ import scala.jdk.CollectionConverters._
   serverProperties = Array(
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", 
value = "classic,consumer,streams"),
     new ClusterConfigProperty(key = 
"group.streams.initial.rebalance.delay.ms", value = "0")
   )
 )
 class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
 
-  @ClusterTest(
-    serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer,streams"),
-    )
-  )
+  @ClusterTest
   def testStreamsGroupHeartbeatWithInvalidAPIVersion(): Unit = {
     // Test that invalid API version throws UnsupportedVersionException
     assertThrows(classOf[UnsupportedVersionException], () =>
@@ -57,9 +52,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
   }
 
   @ClusterTest(
-    serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer,streams"),
-    ),
     features = Array(
       new ClusterFeature(feature = Feature.STREAMS_VERSION, version = 0)
     )
diff --git a/docs/getting-started/upgrade.md b/docs/getting-started/upgrade.md
index 56f42e4315e..3119637d5d4 100644
--- a/docs/getting-started/upgrade.md
+++ b/docs/getting-started/upgrade.md
@@ -38,6 +38,7 @@ type: docs
   * The new config prefix `remote.log.metadata.admin.` has been introduced. It 
allows independent configuration of the admin client used by 
`TopicBasedRemoteLogMetadataManager`. For further details, please refer to 
[KIP-1208](https://cwiki.apache.org/confluence/x/vYqhFg).
   * The `kafka-streams-scala` library is deprecated as of Kafka 4.3 and will 
be removed in Kafka 5.0. For further details, please refer to the [migration 
guide](/{version}/streams/developer-guide/scala-migration).
   * Support for cordoning log directories: For further details, please refer 
to [KIP-1066](https://cwiki.apache.org/confluence/x/Lg_TEg).
+  * The `group.coordinator.rebalance.protocols` configuration is deprecated 
and will be removed in Kafka 5.0. In Kafka 5.0, all protocols will always be 
enabled and controlled solely by feature versions (`group.version`, 
`streams.version`, `share.version`) via `kafka-features.sh`. For further 
details, please refer to 
[KIP-1237](https://cwiki.apache.org/confluence/x/jIqmFw).
 
 ## Upgrading to 4.2.0
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 68d3ecb8fe3..5a55748e334 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -65,8 +65,14 @@ public class GroupCoordinatorConfig {
     ///
     /// Group coordinator configs
     ///
+    @Deprecated(since = "4.3", forRemoval = true)
     public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = 
"group.coordinator.rebalance.protocols";
-    public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = 
"The list of enabled rebalance protocols.";
+    @Deprecated(since = "4.3", forRemoval = true)
+    public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = 
"This configuration is deprecated and will be removed in Kafka 5.0. " +
+        "The list of enabled rebalance protocols. " +
+        "In Kafka 5.0, all protocols will always be enabled and cannot be 
disabled via this configuration. " +
+        "Use feature versions (group.version, streams.version, share.version) 
managed by kafka-features.sh instead.";
+    @Deprecated(since = "4.3", forRemoval = true)
     public static final List<String> 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of(
         Group.GroupType.CLASSIC.toString(),
         Group.GroupType.CONSUMER.toString(),
diff --git 
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
 
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
index bb68b18bfa7..ac6c05e1151 100644
--- 
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
+++ 
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
@@ -232,6 +232,7 @@ public class ClusterTestExtensionsTest {
         assertEquals(supportedGroupProtocols, 
clusterInstance.supportedGroupProtocols());
     }
 
+    @SuppressWarnings("removal")
     @ClusterTests({
         @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
             @ClusterConfigProperty(key = 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
diff --git a/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
index a17256908fb..3759cd75164 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
@@ -466,7 +466,6 @@ public class GroupsCommandTest {
     @SuppressWarnings({"NPathComplexity", "CyclomaticComplexity"})
     @ClusterTest(
         serverProperties = {
-            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer,streams"),
             @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
             @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
         }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
index 3915f2e73b7..354118397ac 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.streams.GroupProtocol;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -73,11 +72,11 @@ public class DescribeStreamsGroupTest {
     private static final String INPUT_TOPIC_2 = "customInputTopic2";
     private static final String OUTPUT_TOPIC_2 = "customOutputTopic2";
     private static String bootstrapServers;
+
     @BeforeAll
     public static void setup() throws Exception {
         // start the cluster and create the input topic
         final Properties props = new Properties();
-        
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "classic,consumer,streams");
         cluster = new EmbeddedKafkaCluster(1, props);
         cluster.start();
         cluster.createTopic(INPUT_TOPIC, 2, 1);
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java 
b/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java
index ecbab7b7269..151cb570694 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.streams.GroupProtocol;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -72,7 +71,6 @@ public class ListStreamsGroupTest {
     public static void setup() throws Exception {
         // start the cluster and create the input topic
         final Properties props = new Properties();
-        
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "classic,consumer,streams");
         cluster = new EmbeddedKafkaCluster(1, props);
         cluster.start();
         cluster.createTopic(INPUT_TOPIC, 2, 1);

Reply via email to