This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 9311a25c2f1 KAFKA-18886 add behavior change of CreateTopicPolicy and
AlterConfigPolicy to zk2kraft (#19087)
9311a25c2f1 is described below
commit 9311a25c2f14d429f6aa46f20cd75050a97c56d0
Author: Logan Zhu <[email protected]>
AuthorDate: Wed Mar 5 15:15:03 2025 +0800
KAFKA-18886 add behavior change of CreateTopicPolicy and AlterConfigPolicy
to zk2kraft (#19087)
1. Updated JavaDoc to reflect that CreateTopicPolicy and AlterConfigPolicy
run on the controller in KRaft mode.
2. Modified Behavioral Change Reference in the HTML docs to include this
change.
3. add warning message to KafkaConfig if the config of broker node has
policy configs
Reviewers: TengYao Chi <[email protected]>, Ken Huang
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
core/src/main/scala/kafka/server/KafkaConfig.scala | 8 ++++++++
docs/zk2kraft.html | 7 +++++++
.../java/org/apache/kafka/server/config/ServerLogConfigs.java | 6 ++++--
3 files changed, 19 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index db5b84b58ae..37f60691c6e 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -658,6 +658,11 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
"There must be at least one broker advertised listener." + (
if (processRoles.contains(ProcessRole.BrokerRole)) s" Perhaps all
listeners appear in ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG}?" else ""))
}
+ def warnIfConfigDefinedInWrongRole(expectedRole: ProcessRole, configName:
String): Unit = {
+ if (originals.containsKey(configName)) {
+ warn(s"$configName is defined in ${processRoles.mkString(", ")}. It
should be defined in the $expectedRole role.")
+ }
+ }
if (processRoles == Set(ProcessRole.BrokerRole)) {
// KRaft broker-only
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
@@ -682,6 +687,9 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
if (controllerListenerNames.size > 1) {
warn(s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} has multiple
entries; only the first will be used since
${KRaftConfigs.PROCESS_ROLES_CONFIG}=broker: ${controllerListenerNames.asJava}")
}
+ // warn if create.topic.policy.class.name or
alter.config.policy.class.name is defined in the broker role
+ warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole,
ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG)
+ warnIfConfigDefinedInWrongRole(ProcessRole.ControllerRole,
ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG)
} else if (processRoles == Set(ProcessRole.ControllerRole)) {
// KRaft controller-only
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
diff --git a/docs/zk2kraft.html b/docs/zk2kraft.html
index a33c972b10b..fc3a9155ebe 100644
--- a/docs/zk2kraft.html
+++ b/docs/zk2kraft.html
@@ -241,6 +241,13 @@
<strong>Configuration Value Size Limitation</strong>: KRaft mode
restricts configuration values to a maximum size of
<code>Short.MAX_VALUE</code>,
which prevents using the append operation to create larger
configuration values.
</li>
+ <li>
+ <strong>Policy Class Deployment</strong>:
+ In KRaft mode, the <code>CreateTopicPolicy</code> and
<code>AlterConfigPolicy</code> plugins run on the controller instead of the
broker.
+ This requires users to deploy the policy class JAR files on the
controller and configure the parameters
+ (<code>create.topic.policy.class.name</code> and
<code>alter.config.policy.class.name</code>) on the controller.
+ <p>Note: If migrating from ZooKeeper mode, ensure policy JARs are
moved from brokers to controllers.</p>
+ </li>
</ul>
</div>
<!--#include virtual="../includes/_footer.htm" -->
diff --git
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
index ac11be948ad..aeb4de47cf3 100644
---
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
+++
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
@@ -139,10 +139,12 @@ public class ServerLogConfigs {
public static final String CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG =
"create.topic.policy.class.name";
public static final String CREATE_TOPIC_POLICY_CLASS_NAME_DOC = "The
create topic policy class that should be used for validation. The class should
" +
- "implement the
<code>org.apache.kafka.server.policy.CreateTopicPolicy</code> interface.";
+ "implement the
<code>org.apache.kafka.server.policy.CreateTopicPolicy</code> interface. " +
+ "<p>Note: This policy runs on the controller instead of the
broker.</p>";
public static final String ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG =
"alter.config.policy.class.name";
public static final String ALTER_CONFIG_POLICY_CLASS_NAME_DOC = "The alter
configs policy class that should be used for validation. The class should " +
- "implement the
<code>org.apache.kafka.server.policy.AlterConfigPolicy</code> interface.";
+ "implement the
<code>org.apache.kafka.server.policy.AlterConfigPolicy</code> interface. " +
+ "<p>Note: This policy runs on the controller instead of the
broker.</p>";
public static final String LOG_INITIAL_TASK_DELAY_MS_CONFIG = LOG_PREFIX +
"initial.task.delay.ms";
public static final long LOG_INITIAL_TASK_DELAY_MS_DEFAULT = 30 * 1000L;