This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b83a23a4f90 KAFKA-18946 Move BrokerReconfigurable and
DynamicProducerStateManagerConfig to server module (#19174)
b83a23a4f90 is described below
commit b83a23a4f90f757b3fcec05c93f27385daa49bcd
Author: TengYao Chi <[email protected]>
AuthorDate: Thu Mar 20 21:30:19 2025 +0800
KAFKA-18946 Move BrokerReconfigurable and DynamicProducerStateManagerConfig
to server module (#19174)
This patch is to move `DynamicProducerStateManagerConfig` and
`BrokerReconfigurable` to the server module.
Reviewers: PoAn Yang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../scala/kafka/server/DynamicBrokerConfig.scala | 39 +++++-------
core/src/main/scala/kafka/server/KafkaConfig.scala | 3 +-
.../kafka/server/config/AbstractKafkaConfig.java | 2 +
.../kafka/server/config/BrokerReconfigurable.java | 69 ++++++++++++++++++++++
.../config/DynamicProducerStateManagerConfig.java | 67 +++++++++++++++++++++
5 files changed, 155 insertions(+), 25 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index b9b25c1afeb..29a3971d37f 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -40,12 +40,12 @@ import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.KafkaRaftClient
import org.apache.kafka.server.{ProcessRole, DynamicThreadPool}
import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.apache.kafka.server.config.{ServerConfigs, ServerLogConfigs,
ServerTopicConfigSynonyms}
+import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig,
ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin,
MetricConfigs}
import org.apache.kafka.server.telemetry.ClientTelemetry
import org.apache.kafka.snapshot.RecordsSnapshotReader
-import org.apache.kafka.storage.internals.log.{LogConfig,
ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.LogConfig
import scala.collection._
import scala.jdk.CollectionConverters._
@@ -323,6 +323,17 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
reconfigurables.add(reconfigurable)
}
+ def addBrokerReconfigurable(reconfigurable:
org.apache.kafka.server.config.BrokerReconfigurable): Unit = {
+ verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
+ brokerReconfigurables.add(new BrokerReconfigurable {
+ override def reconfigurableConfigs: Set[String] =
reconfigurable.reconfigurableConfigs().asScala
+
+ override def validateReconfiguration(newConfig: KafkaConfig): Unit =
reconfigurable.validateReconfiguration(newConfig)
+
+ override def reconfigure(oldConfig: KafkaConfig, newConfig:
KafkaConfig): Unit = reconfigurable.reconfigure(oldConfig, newConfig)
+ })
+ }
+
def addBrokerReconfigurable(reconfigurable: BrokerReconfigurable): Unit = {
verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs)
brokerReconfigurables.add(reconfigurable)
@@ -605,6 +616,9 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
}
}
+/**
+ * Implement [[org.apache.kafka.server.config.BrokerReconfigurable]] instead.
+ */
trait BrokerReconfigurable {
def reconfigurableConfigs: Set[String]
@@ -977,27 +991,6 @@ class DynamicListenerConfig(server: KafkaBroker) extends
BrokerReconfigurable wi
}
-class DynamicProducerStateManagerConfig(val producerStateManagerConfig:
ProducerStateManagerConfig) extends BrokerReconfigurable with Logging {
- def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
- if (producerStateManagerConfig.producerIdExpirationMs !=
newConfig.transactionLogConfig.producerIdExpirationMs) {
- info(s"Reconfigure
${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG} from
${producerStateManagerConfig.producerIdExpirationMs} to
${newConfig.transactionLogConfig.producerIdExpirationMs}")
-
producerStateManagerConfig.setProducerIdExpirationMs(newConfig.transactionLogConfig.producerIdExpirationMs)
- }
- if (producerStateManagerConfig.transactionVerificationEnabled !=
newConfig.transactionLogConfig.transactionPartitionVerificationEnable) {
- info(s"Reconfigure
${TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG} from
${producerStateManagerConfig.transactionVerificationEnabled} to
${newConfig.transactionLogConfig.transactionPartitionVerificationEnable}")
-
producerStateManagerConfig.setTransactionVerificationEnabled(newConfig.transactionLogConfig.transactionPartitionVerificationEnable)
- }
- }
-
- def validateReconfiguration(newConfig: KafkaConfig): Unit = {
- if (newConfig.transactionLogConfig.producerIdExpirationMs < 0)
- throw new
ConfigException(s"${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG}
cannot be less than 0, current value is
${producerStateManagerConfig.producerIdExpirationMs}, and new value is
${newConfig.transactionLogConfig.producerIdExpirationMs}")
- }
-
- override def reconfigurableConfigs: Set[String] =
DynamicProducerStateManagerConfig
-
-}
-
class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable
with Logging {
override def reconfigurableConfigs: Set[String] = {
DynamicRemoteLogConfig.ReconfigurableConfigs
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 1c91f58492e..608e27f25e4 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -204,10 +204,9 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
private val _shareCoordinatorConfig = new ShareCoordinatorConfig(this)
def shareCoordinatorConfig: ShareCoordinatorConfig = _shareCoordinatorConfig
- private val _transactionLogConfig = new TransactionLogConfig(this)
+ override val transactionLogConfig = new TransactionLogConfig(this)
private val _transactionStateManagerConfig = new
TransactionStateManagerConfig(this)
private val _addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(this)
- def transactionLogConfig: TransactionLogConfig = _transactionLogConfig
def transactionStateManagerConfig: TransactionStateManagerConfig =
_transactionStateManagerConfig
def addPartitionsToTxnConfig: AddPartitionsToTxnConfig =
_addPartitionsToTxnConfig
diff --git
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index 87bf18a412f..fc6906b96d3 100644
---
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -83,4 +83,6 @@ public abstract class AbstractKafkaConfig extends
AbstractConfig {
public int backgroundThreads() {
return getInt(ServerConfigs.BACKGROUND_THREADS_CONFIG);
}
+
+ public abstract TransactionLogConfig transactionLogConfig();
}
diff --git
a/server/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java
b/server/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java
new file mode 100644
index 00000000000..2b3fe9dcf34
--- /dev/null
+++
b/server/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import java.util.Set;
+
+/**
+ * An interface for Kafka broker configs that support dynamic reconfiguration.
+ * <p>
+ * Components that implement this interface can have their configurations
updated
+ * at runtime without requiring a broker restart.
+ * <p>
+ * The reconfiguration process follows three steps:
+ * <ol>
+ * <li>Determining which configurations can be dynamically updated via
{@link #reconfigurableConfigs()}</li>
+ * <li>Validating the new configuration before applying it via {@link
#validateReconfiguration(AbstractKafkaConfig)}</li>
+ * <li>Applying the new configuration via {@link
#reconfigure(AbstractKafkaConfig, AbstractKafkaConfig)}</li>
+ * </ol>
+ * <strong>Note: Since Kafka is eliminating Scala, developers should implement
this interface instead of {@link kafka.server.BrokerReconfigurable}</strong>
+ *
+ *
+ * @see AbstractKafkaConfig
+ */
+public interface BrokerReconfigurable {
+ /**
+ * Returns the set of configuration keys that can be dynamically
reconfigured.
+ *
+ * <p>
+ * Only the configurations returned by this method will be considered for
+ * dynamic updates by the broker.
+ *
+ * @return a set of configuration key names that can be dynamically updated
+ */
+ Set<String> reconfigurableConfigs();
+
+ /**
+ * Validates the new configuration before applying it.
+ * <p>
+ * This method should verify that the new configuration values are valid
and
+ * can be safely applied.
+ *
+ * @param newConfig the new configuration to validate
+ */
+ void validateReconfiguration(AbstractKafkaConfig newConfig);
+
+ /**
+ * Applies the new configuration.
+ * <p>
+ * This method is called after the new configuration has been validated.
+ *
+ * @param oldConfig the previous configuration
+ * @param newConfig the new configuration to apply
+ */
+ void reconfigure(AbstractKafkaConfig oldConfig, AbstractKafkaConfig
newConfig);
+}
diff --git
a/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
b/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
new file mode 100644
index 00000000000..4158194f000
--- /dev/null
+++
b/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+
+public class DynamicProducerStateManagerConfig implements BrokerReconfigurable
{
+ private final Logger log =
LoggerFactory.getLogger(DynamicProducerStateManagerConfig.class);
+ private final ProducerStateManagerConfig producerStateManagerConfig;
+
+ public DynamicProducerStateManagerConfig(ProducerStateManagerConfig
producerStateManagerConfig) {
+ this.producerStateManagerConfig = producerStateManagerConfig;
+ }
+
+ @Override
+ public Set<String> reconfigurableConfigs() {
+ return Set.of(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG,
TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
+ }
+
+ @Override
+ public void validateReconfiguration(AbstractKafkaConfig newConfig) {
+ TransactionLogConfig transactionLogConfig =
newConfig.transactionLogConfig();
+ if (transactionLogConfig.producerIdExpirationMs() < 0)
+ throw new
ConfigException(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG + "cannot
be less than 0, current value is " +
+
producerStateManagerConfig.producerIdExpirationMs() + ", and new value is " +
transactionLogConfig.producerIdExpirationMs());
+ }
+
+ @Override
+ public void reconfigure(AbstractKafkaConfig oldConfig, AbstractKafkaConfig
newConfig) {
+ TransactionLogConfig transactionLogConfig =
newConfig.transactionLogConfig();
+ if (producerStateManagerConfig.producerIdExpirationMs() !=
transactionLogConfig.producerIdExpirationMs()) {
+ log.info("Reconfigure {} from {} to {}",
+ TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG,
+ producerStateManagerConfig.producerIdExpirationMs(),
+ transactionLogConfig.producerIdExpirationMs());
+
producerStateManagerConfig.setProducerIdExpirationMs(transactionLogConfig.producerIdExpirationMs());
+ }
+ if (producerStateManagerConfig.transactionVerificationEnabled() !=
transactionLogConfig.transactionPartitionVerificationEnable()) {
+ log.info("Reconfigure {} from {} to {}",
+
TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
+ producerStateManagerConfig.transactionVerificationEnabled(),
+ transactionLogConfig.transactionPartitionVerificationEnable());
+
producerStateManagerConfig.setTransactionVerificationEnabled(transactionLogConfig.transactionPartitionVerificationEnable());
+ }
+ }
+}