This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5e33fe783b3 KAFKA-19740 Deprecate
group.coordinator.rebalance.protocols config (KIP-1237) (#21522)
5e33fe783b3 is described below
commit 5e33fe783b3a18b2be1af1c9e4831809a21e0a77
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Tue Mar 10 16:16:16 2026 +0800
KAFKA-19740 Deprecate group.coordinator.rebalance.protocols config
(KIP-1237) (#21522)
Implement [KIP-1237](https://cwiki.apache.org/confluence/x/jIqmFw).
- Deprecate the `group.coordinator.rebalance.protocols` configuration
for removal in Kafka 5.0.
- Log a deprecation warning when users disable any default protocol
(classic, consumer, streams), advising them to remove the configuration
- Guard warnings with `doLog` to avoid duplicate output during startup
- Clean up redundant `group.coordinator.rebalance.protocols` settings in
test files — `"classic,consumer,streams"` is already the default, and
`share` is a no-op (controlled by `share.version` feature)
Test result:
```
# group.coordinator.rebalance.protocols=classic,consumer,streams
WARN The config `group.coordinator.rebalance.protocols` is deprecated
and will be removed in Kafka 5.0. Please remove the configuration to
prepare for the upgrade. (kafka.server.KafkaConfig)
# =classic,consumer
WARN The config `group.coordinator.rebalance.protocols` is deprecated
and will be removed in Kafka 5.0. The following protocol(s) are
currently disabled: streams. In Kafka 5.0, all protocols will always be
enabled and controlled solely by feature versions (group.version,
streams.version, share.version) via kafka-features.sh. Please remove the
configuration, which will restore all protocols to the default enabled
state, to prepare for the upgrade. (kafka.server.KafkaConfig)
# =classic,streams
WARN The config `group.coordinator.rebalance.protocols` is deprecated
and will be removed in Kafka 5.0. The following protocol(s) are
currently disabled: consumer. In Kafka 5.0, all protocols will always be
enabled and controlled solely by feature versions (group.version,
streams.version, share.version) via kafka-features.sh. Please remove the
configuration, which will restore all protocols to the default enabled
state, to prepare for the upgrade. (kafka.server.KafkaConfig)
# =classic
WARN The config `group.coordinator.rebalance.protocols` is deprecated
and will be removed in Kafka 5.0. The following protocol(s) are
currently disabled: consumer, streams. In Kafka 5.0, all protocols will
always be enabled and controlled solely by feature versions
(group.version, streams.version, share.version) via kafka-features.sh.
Please remove the configuration, which will restore all protocols to the
default enabled state, to prepare for the upgrade.
(kafka.server.KafkaConfig)
# =classic,consumer,streams,share
WARN 'share' in `group.coordinator.rebalance.protocols` is deprecated.
Share groups are controlled by the 'share.version' feature. This config
will be removed in Kafka 5.0. (kafka.server.KafkaConfig)
WARN The config `group.coordinator.rebalance.protocols` is deprecated
and will be removed in Kafka 5.0. Please remove the configuration to
prepare for the upgrade. (kafka.server.KafkaConfig)
# =classic,consumer,share
WARN 'share' in `group.coordinator.rebalance.protocols` is deprecated.
Share groups are controlled by the 'share.version' feature. This config
will be removed in Kafka 5.0. (kafka.server.KafkaConfig)
WARN The config `group.coordinator.rebalance.protocols` is deprecated
and will be removed in Kafka 5.0. The following protocol(s) are
currently disabled: streams. In Kafka 5.0, all protocols will always be
enabled and controlled solely by feature versions (group.version,
streams.version, share.version) via kafka-features.sh. Please remove the
configuration, which will restore all protocols to the default enabled
state, to prepare for the upgrade. (kafka.server.KafkaConfig)
# =consumer
ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
org.apache.kafka.common.config.ConfigException: Disabling the 'classic'
protocol is not supported.
```
Reviewers: Sean Quah <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../consumer/ShareConsumerRackAwareTest.java | 1 -
.../kafka/clients/consumer/ShareConsumerTest.java | 1 -
core/src/main/scala/kafka/server/KafkaConfig.scala | 22 ++++++++--
.../scala/unit/kafka/server/KafkaConfigTest.scala | 49 ++++++++++++++++++++++
.../server/StreamsGroupHeartbeatRequestTest.scala | 10 +----
docs/getting-started/upgrade.md | 1 +
.../coordinator/group/GroupCoordinatorConfig.java | 8 +++-
.../test/junit/ClusterTestExtensionsTest.java | 1 +
.../org/apache/kafka/tools/GroupsCommandTest.java | 1 -
.../tools/streams/DescribeStreamsGroupTest.java | 3 +-
.../kafka/tools/streams/ListStreamsGroupTest.java | 2 -
11 files changed, 79 insertions(+), 20 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
index 4e60b0e12cc..488f8b75494 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
@@ -49,7 +49,6 @@ public class ShareConsumerRackAwareTest {
@ClusterConfigProperty(id = 0, key = "broker.rack", value =
"rack0"),
@ClusterConfigProperty(id = 1, key = "broker.rack", value =
"rack1"),
@ClusterConfigProperty(id = 2, key = "broker.rack", value =
"rack2"),
- @ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic, share"),
@ClusterConfigProperty(key =
GroupCoordinatorConfig.SHARE_GROUP_ASSIGNORS_CONFIG, value =
"org.apache.kafka.clients.consumer.RackAwareAssignor")
}
)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 8a08a046587..7ca1b5b6e8b 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -2362,7 +2362,6 @@ public class ShareConsumerTest {
brokers = 1,
serverProperties = {
@ClusterConfigProperty(key = "auto.create.topics.enable", value =
"false"),
- @ClusterConfigProperty(key =
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
@ClusterConfigProperty(key = "group.share.enable", value = "true"),
@ClusterConfigProperty(key =
"group.share.partition.max.record.locks", value = "10000"),
@ClusterConfigProperty(key =
"group.share.record.lock.duration.ms", value = "15000"),
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 0e1a3678227..9081eb058cb 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -358,9 +358,25 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
if (!protocols.contains(GroupType.CLASSIC)) {
throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}'
protocol is not supported.")
}
- if (protocols.contains(GroupType.SHARE)) {
- warn(s"'${GroupType.SHARE}' in
${GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG} is
deprecated. " +
- s"Share groups are controlled by the 'share.version' feature; this
broker config will be ignored in a future release.")
+ if (doLog && protocols.contains(GroupType.SHARE)) {
+ warn(s"'${GroupType.SHARE}' in
`${GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG}` is
deprecated. " +
+ s"Share groups are controlled by the 'share.version' feature. " +
+ s"This config will be removed in Kafka 5.0.")
+ }
+ if (doLog &&
originals().containsKey(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG))
{
+ val defaultProtocols =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT
+ .asScala.map(_.toUpperCase).map(GroupType.valueOf).toSet
+ val missingProtocols = defaultProtocols -- protocols
+ if (missingProtocols.nonEmpty) {
+ warn(s"The config
`${GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG}` is
deprecated and will be removed in Kafka 5.0. " +
+ s"The following protocol(s) are currently disabled:
${missingProtocols.mkString(", ")}. " +
+ s"In Kafka 5.0, all protocols will always be enabled and controlled
solely by feature versions " +
+ s"(group.version, streams.version, share.version) via
kafka-features.sh. " +
+ s"Please remove the configuration, which will restore all protocols
to the default enabled state, to prepare for the upgrade.")
+ } else {
+ warn(s"The config
`${GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG}` is
deprecated and will be removed in Kafka 5.0. " +
+ s"Please remove the configuration to prepare for the upgrade.")
+ }
}
protocols
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index ead46d36018..e726dbe3e72 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1827,6 +1827,55 @@ class KafkaConfigTest {
assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE),
config3.groupCoordinatorRebalanceProtocols)
}
+ @Test
+ def testGroupCoordinatorRebalanceProtocolsDeprecationWarning(): Unit = {
+ val props = new Properties()
+ props.putAll(kraftProps())
+
+ val configName =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG
+ val defaultDeprecationWarning =
+ s"The config `$configName` is deprecated and will be removed in Kafka
5.0. " +
+ "Please remove the configuration to prepare for the upgrade."
+ def deprecationWarning(disabled: String): String =
+ s"The config `$configName` is deprecated and will be removed in Kafka
5.0. " +
+ s"The following protocol(s) are currently disabled: $disabled. " +
+ "In Kafka 5.0, all protocols will always be enabled and controlled
solely by feature versions " +
+ "(group.version, streams.version, share.version) via kafka-features.sh.
" +
+ "Please remove the configuration, which will restore all protocols to
the default enabled state, to prepare for the upgrade."
+ val shareDeprecationWarning = s"'share' in `$configName` is deprecated. " +
+ "Share groups are controlled by the 'share.version' feature. " +
+ "This config will be removed in Kafka 5.0."
+
+ Using.resource(LogCaptureAppender.createAndRegister) { appender =>
+ appender.setClassLogger(classOf[KafkaConfig], Level.WARN)
+
+ // Config not set: no warning.
+ KafkaConfig.fromProps(props)
+ assertTrue(appender.getMessages.isEmpty)
+
+ // Explicitly set to default value: simple deprecation warning.
+ props.put(configName, "classic,consumer,streams")
+ KafkaConfig.fromProps(props)
+ assertTrue(appender.getMessages.contains(defaultDeprecationWarning))
+ appender.getMessages.clear()
+
+ // Missing streams.
+ props.put(configName, "classic,consumer")
+ KafkaConfig.fromProps(props)
+ assertTrue(appender.getMessages.contains(deprecationWarning("streams")))
+
+ // Missing consumer.
+ props.put(configName, "classic,streams")
+ KafkaConfig.fromProps(props)
+ assertTrue(appender.getMessages.contains(deprecationWarning("consumer")))
+
+ // Including "share": no-op warning.
+ props.put(configName, "classic,consumer,streams,share")
+ KafkaConfig.fromProps(props)
+ assertTrue(appender.getMessages.contains(shareDeprecationWarning))
+ }
+ }
+
@Test
def testConsumerGroupMigrationPolicy(): Unit = {
val props = new Properties()
diff --git
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
index 90331fa4993..bb4f2d220d8 100644
---
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
@@ -35,17 +35,12 @@ import scala.jdk.CollectionConverters._
serverProperties = Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
- new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols",
value = "classic,consumer,streams"),
new ClusterConfigProperty(key =
"group.streams.initial.rebalance.delay.ms", value = "0")
)
)
class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
- @ClusterTest(
- serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer,streams"),
- )
- )
+ @ClusterTest
def testStreamsGroupHeartbeatWithInvalidAPIVersion(): Unit = {
// Test that invalid API version throws UnsupportedVersionException
assertThrows(classOf[UnsupportedVersionException], () =>
@@ -57,9 +52,6 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
}
@ClusterTest(
- serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer,streams"),
- ),
features = Array(
new ClusterFeature(feature = Feature.STREAMS_VERSION, version = 0)
)
diff --git a/docs/getting-started/upgrade.md b/docs/getting-started/upgrade.md
index 56f42e4315e..3119637d5d4 100644
--- a/docs/getting-started/upgrade.md
+++ b/docs/getting-started/upgrade.md
@@ -38,6 +38,7 @@ type: docs
* The new config prefix `remote.log.metadata.admin.` has been introduced. It
allows independent configuration of the admin client used by
`TopicBasedRemoteLogMetadataManager`. For further details, please refer to
[KIP-1208](https://cwiki.apache.org/confluence/x/vYqhFg).
* The `kafka-streams-scala` library is deprecated as of Kafka 4.3 and will
be removed in Kafka 5.0. For further details, please refer to the [migration
guide](/{version}/streams/developer-guide/scala-migration).
* Support for cordoning log directories: For further details, please refer
to [KIP-1066](https://cwiki.apache.org/confluence/x/Lg_TEg).
+ * The `group.coordinator.rebalance.protocols` configuration is deprecated
and will be removed in Kafka 5.0. In Kafka 5.0, all protocols will always be
enabled and controlled solely by feature versions (`group.version`,
`streams.version`, `share.version`) via `kafka-features.sh`. For further
details, please refer to
[KIP-1237](https://cwiki.apache.org/confluence/x/jIqmFw).
## Upgrading to 4.2.0
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 68d3ecb8fe3..5a55748e334 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -65,8 +65,14 @@ public class GroupCoordinatorConfig {
///
/// Group coordinator configs
///
+ @Deprecated(since = "4.3", forRemoval = true)
public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG =
"group.coordinator.rebalance.protocols";
- public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC =
"The list of enabled rebalance protocols.";
+ @Deprecated(since = "4.3", forRemoval = true)
+ public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC =
"This configuration is deprecated and will be removed in Kafka 5.0. " +
+ "The list of enabled rebalance protocols. " +
+ "In Kafka 5.0, all protocols will always be enabled and cannot be
disabled via this configuration. " +
+ "Use feature versions (group.version, streams.version, share.version)
managed by kafka-features.sh instead.";
+ @Deprecated(since = "4.3", forRemoval = true)
public static final List<String>
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of(
Group.GroupType.CLASSIC.toString(),
Group.GroupType.CONSUMER.toString(),
diff --git
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
index bb68b18bfa7..ac6c05e1151 100644
---
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
+++
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
@@ -232,6 +232,7 @@ public class ClusterTestExtensionsTest {
assertEquals(supportedGroupProtocols,
clusterInstance.supportedGroupProtocols());
}
+ @SuppressWarnings("removal")
@ClusterTests({
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key =
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
diff --git a/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
index a17256908fb..3759cd75164 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
@@ -466,7 +466,6 @@ public class GroupsCommandTest {
@SuppressWarnings({"NPathComplexity", "CyclomaticComplexity"})
@ClusterTest(
serverProperties = {
- @ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer,streams"),
@ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
index 3915f2e73b7..354118397ac 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
@@ -73,11 +72,11 @@ public class DescribeStreamsGroupTest {
private static final String INPUT_TOPIC_2 = "customInputTopic2";
private static final String OUTPUT_TOPIC_2 = "customOutputTopic2";
private static String bootstrapServers;
+
@BeforeAll
public static void setup() throws Exception {
// start the cluster and create the input topic
final Properties props = new Properties();
-
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
"classic,consumer,streams");
cluster = new EmbeddedKafkaCluster(1, props);
cluster.start();
cluster.createTopic(INPUT_TOPIC, 2, 1);
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java
index ecbab7b7269..151cb570694 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
@@ -72,7 +71,6 @@ public class ListStreamsGroupTest {
public static void setup() throws Exception {
// start the cluster and create the input topic
final Properties props = new Properties();
-
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
"classic,consumer,streams");
cluster = new EmbeddedKafkaCluster(1, props);
cluster.start();
cluster.createTopic(INPUT_TOPIC, 2, 1);