This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 6d68f8a82c9 MINOR: Move BrokerReconfigurable to the sever-common module (#19383) 6d68f8a82c9 is described below commit 6d68f8a82c9c4a9d69a19ee40143f7c7b253dfe6 Author: TengYao Chi <kiting...@gmail.com> AuthorDate: Mon Apr 7 07:39:01 2025 +0800 MINOR: Move BrokerReconfigurable to the sever-common module (#19383) This patch moves `BrokerReconfigurable` to the `server-common module` and decouples the `TransactionLogConfig` and `KafkaConfig` to unblock KAFKA-14485. Reviewers: PoAn Yang <pay...@apache.org>, TaiJuWu <tjwu1...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- checkstyle/import-control-server-common.xml | 2 +- checkstyle/import-control-server.xml | 1 + .../coordinator/transaction/TransactionCoordinator.scala | 13 +++++++------ core/src/main/scala/kafka/log/LogManager.scala | 6 ++++-- .../scala/kafka/server/AutoTopicCreationManager.scala | 6 ++++-- .../src/main/scala/kafka/server/DynamicBrokerConfig.scala | 7 ++++--- core/src/main/scala/kafka/server/KafkaConfig.scala | 3 +-- core/src/main/scala/kafka/server/ReplicaManager.scala | 8 ++++++-- .../kafka/server/metadata/BrokerMetadataPublisher.scala | 4 +++- .../test/scala/unit/kafka/server/ReplicaManagerTest.scala | 6 ++++-- .../org/apache/kafka}/config/BrokerReconfigurable.java | 15 +++++++-------- .../apache/kafka/server/config/AbstractKafkaConfig.java | 2 -- .../server/config/DynamicProducerStateManagerConfig.java | 10 ++++++---- 13 files changed, 48 insertions(+), 35 deletions(-) diff --git a/checkstyle/import-control-server-common.xml b/checkstyle/import-control-server-common.xml index 20f6e32fc9a..2b5c9002c9b 100644 --- a/checkstyle/import-control-server-common.xml +++ b/checkstyle/import-control-server-common.xml @@ -38,7 +38,7 @@ <disallow pkg="kafka" /> <!-- anyone can use public classes --> - <allow pkg="org.apache.kafka.common" exact-match="true" /> + <allow pkg="org.apache.kafka.common" /> <allow pkg="org.apache.kafka.common.security" /> <allow pkg="org.apache.kafka.common.serialization" /> <allow pkg="org.apache.kafka.common.utils" /> diff --git a/checkstyle/import-control-server.xml b/checkstyle/import-control-server.xml index 05e67a153bd..b45b3c41c27 100644 --- a/checkstyle/import-control-server.xml +++ b/checkstyle/import-control-server.xml @@ -59,6 +59,7 @@ <allow pkg="org.apache.kafka.metadata" /> <!-- utilities and reusable classes from server-common --> + <allow pkg="org.apache.kafka.config"/> <allow pkg="org.apache.kafka.queue" /> <allow pkg="org.apache.kafka.security" /> <allow pkg="org.apache.kafka.server.common" /> diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 697d6a30144..ed8140e596f 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse, TransactionResult} import org.apache.kafka.common.utils.{LogContext, ProducerIdAndEpoch, Time} -import org.apache.kafka.coordinator.transaction.ProducerIdManager +import org.apache.kafka.coordinator.transaction.{ProducerIdManager, TransactionLogConfig} import org.apache.kafka.metadata.MetadataCache import org.apache.kafka.server.common.{RequestLocal, TransactionVersion} import org.apache.kafka.server.util.Scheduler @@ -46,13 +46,14 @@ object TransactionCoordinator { metadataCache: MetadataCache, time: Time): TransactionCoordinator = { + val transactionLogConfig = new TransactionLogConfig(config) val txnConfig = TransactionConfig(config.transactionStateManagerConfig.transactionalIdExpirationMs, config.transactionStateManagerConfig.transactionMaxTimeoutMs, - config.transactionLogConfig.transactionTopicPartitions, - config.transactionLogConfig.transactionTopicReplicationFactor, - config.transactionLogConfig.transactionTopicSegmentBytes, - config.transactionLogConfig.transactionLoadBufferSize, - config.transactionLogConfig.transactionTopicMinISR, + transactionLogConfig.transactionTopicPartitions, + transactionLogConfig.transactionTopicReplicationFactor, + transactionLogConfig.transactionTopicSegmentBytes, + transactionLogConfig.transactionLoadBufferSize, + transactionLogConfig.transactionTopicMinISR, config.transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs, config.transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs, config.transactionStateManagerConfig.transaction2PCEnabled, diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 2bafe735283..4b255e9a66e 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -29,6 +29,7 @@ import kafka.utils.{CoreUtils, Logging, Pool} import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, Uuid} import org.apache.kafka.common.utils.{Exit, KafkaThread, Time, Utils} import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException, LogDirNotFoundException} +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import scala.jdk.CollectionConverters._ import scala.collection._ @@ -1548,6 +1549,7 @@ object LogManager { val defaultLogConfig = new LogConfig(defaultProps) val cleanerConfig = LogCleaner.cleanerConfig(config) + val transactionLogConfig = new TransactionLogConfig(config) new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile), initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile), @@ -1560,8 +1562,8 @@ object LogManager { flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs, retentionCheckMs = config.logCleanupIntervalMs, maxTransactionTimeoutMs = config.transactionStateManagerConfig.transactionMaxTimeoutMs, - producerStateManagerConfig = new ProducerStateManagerConfig(config.transactionLogConfig.producerIdExpirationMs, config.transactionLogConfig.transactionPartitionVerificationEnable), - producerIdExpirationCheckIntervalMs = config.transactionLogConfig.producerIdExpirationCheckIntervalMs, + producerStateManagerConfig = new ProducerStateManagerConfig(transactionLogConfig.producerIdExpirationMs, transactionLogConfig.transactionPartitionVerificationEnable), + producerIdExpirationCheckIntervalMs = transactionLogConfig.producerIdExpirationCheckIntervalMs, scheduler = kafkaScheduler, brokerTopicStats = brokerTopicStats, logDirFailureChannel = logDirFailureChannel, diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala index 85308473bf6..a2c2bd4d80b 100644 --- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala +++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala @@ -32,6 +32,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{CreateTopicsRequest, RequestContext, RequestHeader} import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.coordinator.share.ShareCoordinator +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager} import scala.collection.{Map, Seq, Set, mutable} @@ -189,10 +190,11 @@ class DefaultAutoTopicCreationManager( .setReplicationFactor(config.groupCoordinatorConfig.offsetsTopicReplicationFactor) .setConfigs(convertToTopicConfigCollections(groupCoordinator.groupMetadataTopicConfigs)) case TRANSACTION_STATE_TOPIC_NAME => + val transactionLogConfig = new TransactionLogConfig(config) new CreatableTopic() .setName(topic) - .setNumPartitions(config.transactionLogConfig.transactionTopicPartitions) - .setReplicationFactor(config.transactionLogConfig.transactionTopicReplicationFactor) + .setNumPartitions(transactionLogConfig.transactionTopicPartitions) + .setReplicationFactor(transactionLogConfig.transactionTopicReplicationFactor) .setConfigs(convertToTopicConfigCollections( txnCoordinator.transactionTopicConfigs)) case SHARE_GROUP_STATE_TOPIC_NAME => diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 29a3971d37f..6c4066c9db2 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -35,10 +35,11 @@ import org.apache.kafka.common.metrics.{Metrics, MetricsReporter} import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable} import org.apache.kafka.common.security.authenticator.LoginManager import org.apache.kafka.common.utils.{BufferSupplier, ConfigUtils, Utils} +import org.apache.kafka.config import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.KafkaRaftClient -import org.apache.kafka.server.{ProcessRole, DynamicThreadPool} +import org.apache.kafka.server.{DynamicThreadPool, ProcessRole} import org.apache.kafka.server.common.ApiMessageAndVersion import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig @@ -323,7 +324,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging reconfigurables.add(reconfigurable) } - def addBrokerReconfigurable(reconfigurable: org.apache.kafka.server.config.BrokerReconfigurable): Unit = { + def addBrokerReconfigurable(reconfigurable: config.BrokerReconfigurable): Unit = { verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala) brokerReconfigurables.add(new BrokerReconfigurable { override def reconfigurableConfigs: Set[String] = reconfigurable.reconfigurableConfigs().asScala @@ -617,7 +618,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } /** - * Implement [[org.apache.kafka.server.config.BrokerReconfigurable]] instead. + * Implement [[config.BrokerReconfigurable]] instead. */ trait BrokerReconfigurable { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 2b8be8518b5..87bfd9cd21f 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -37,7 +37,7 @@ import org.apache.kafka.coordinator.group.Group.GroupType import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} import org.apache.kafka.coordinator.share.ShareCoordinatorConfig -import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionLogConfig, TransactionStateManagerConfig} +import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionStateManagerConfig} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.security.authorizer.AuthorizerUtils @@ -204,7 +204,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) private val _shareCoordinatorConfig = new ShareCoordinatorConfig(this) def shareCoordinatorConfig: ShareCoordinatorConfig = _shareCoordinatorConfig - override val transactionLogConfig = new TransactionLogConfig(this) private val _transactionStateManagerConfig = new TransactionStateManagerConfig(this) private val _addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(this) def transactionStateManagerConfig: TransactionStateManagerConfig = _transactionStateManagerConfig diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 9b5c01a85c9..aa08fc93fff 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -47,6 +47,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ import org.apache.kafka.common.utils.{Exit, Time, Utils} +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta} import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.metadata.MetadataCache @@ -1038,10 +1039,13 @@ class ReplicaManager(val config: KafkaConfig, callback: ((Map[TopicPartition, Errors], Map[TopicPartition, VerificationGuard])) => Unit, transactionSupportedOperation: TransactionSupportedOperation ): Unit = { + def transactionPartitionVerificationEnable = { + new TransactionLogConfig(config).transactionPartitionVerificationEnable + } // Skip verification if the request is not transactional or transaction verification is disabled. - if (transactionalId == null || - (!config.transactionLogConfig.transactionPartitionVerificationEnable && !transactionSupportedOperation.supportsEpochBump) + if (transactionalId == null || addPartitionsToTxnManager.isEmpty + || (!transactionSupportedOperation.supportsEpochBump && !transactionPartitionVerificationEnable) ) { callback((Map.empty[TopicPartition, Errors], Map.empty[TopicPartition, VerificationGuard])) return diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index ead385e7447..de8f16e1e58 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.TimeoutException import org.apache.kafka.common.internals.Topic import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.coordinator.share.ShareCoordinator +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.image.loader.LoaderManifest import org.apache.kafka.image.publisher.MetadataPublisher import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta} @@ -331,9 +332,10 @@ class BrokerMetadataPublisher( case t: Throwable => fatalFaultHandler.handleFault("Error starting GroupCoordinator", t) } try { + val transactionLogConfig = new TransactionLogConfig(config) // Start the transaction coordinator. txnCoordinator.startup(() => metadataCache.numPartitions( - Topic.TRANSACTION_STATE_TOPIC_NAME).orElse(config.transactionLogConfig.transactionTopicPartitions)) + Topic.TRANSACTION_STATE_TOPIC_NAME).orElse(transactionLogConfig.transactionTopicPartitions)) } catch { case t: Throwable => fatalFaultHandler.handleFault("Error starting TransactionCoordinator", t) } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 5ae1262df40..a8e760c7b7a 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2523,7 +2523,8 @@ class ReplicaManagerTest { val props = new Properties() props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "true") config.dynamicConfig.updateBrokerConfig(config.brokerId, props) - TestUtils.waitUntilTrue(() => config.transactionLogConfig.transactionPartitionVerificationEnable, "Config did not dynamically update.") + val transactionLogConfig = new TransactionLogConfig(config) + TestUtils.waitUntilTrue(() => transactionLogConfig.transactionPartitionVerificationEnable, "Config did not dynamically update.") // Try to append more records. We don't need to send a request since the transaction is already ongoing. val moreTransactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence + 1, @@ -2575,7 +2576,8 @@ class ReplicaManagerTest { val props = new Properties() props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "false") config.dynamicConfig.updateBrokerConfig(config.brokerId, props) - TestUtils.waitUntilTrue(() => !config.transactionLogConfig.transactionPartitionVerificationEnable, "Config did not dynamically update.") + val transactionLogConfig = new TransactionLogConfig(config) + TestUtils.waitUntilTrue(() => !transactionLogConfig.transactionPartitionVerificationEnable, "Config did not dynamically update.") // Confirm we did not write to the log and instead returned error. val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue diff --git a/server/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java b/server-common/src/main/java/org/apache/kafka/config/BrokerReconfigurable.java similarity index 88% rename from server/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java rename to server-common/src/main/java/org/apache/kafka/config/BrokerReconfigurable.java index 2b3fe9dcf34..b7076f05ef3 100644 --- a/server/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java +++ b/server-common/src/main/java/org/apache/kafka/config/BrokerReconfigurable.java @@ -14,7 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.server.config; +package org.apache.kafka.config; + +import org.apache.kafka.common.config.AbstractConfig; import java.util.Set; @@ -27,13 +29,10 @@ import java.util.Set; * The reconfiguration process follows three steps: * <ol> * <li>Determining which configurations can be dynamically updated via {@link #reconfigurableConfigs()}</li> - * <li>Validating the new configuration before applying it via {@link #validateReconfiguration(AbstractKafkaConfig)}</li> - * <li>Applying the new configuration via {@link #reconfigure(AbstractKafkaConfig, AbstractKafkaConfig)}</li> + * <li>Validating the new configuration before applying it via {@link #validateReconfiguration(AbstractConfig)}</li> + * <li>Applying the new configuration via {@link #reconfigure(AbstractConfig, AbstractConfig)}</li> * </ol> * <strong>Note: Since Kafka is eliminating Scala, developers should implement this interface instead of {@link kafka.server.BrokerReconfigurable}</strong> - * - * - * @see AbstractKafkaConfig */ public interface BrokerReconfigurable { /** @@ -55,7 +54,7 @@ public interface BrokerReconfigurable { * * @param newConfig the new configuration to validate */ - void validateReconfiguration(AbstractKafkaConfig newConfig); + void validateReconfiguration(AbstractConfig newConfig); /** * Applies the new configuration. @@ -65,5 +64,5 @@ public interface BrokerReconfigurable { * @param oldConfig the previous configuration * @param newConfig the new configuration to apply */ - void reconfigure(AbstractKafkaConfig oldConfig, AbstractKafkaConfig newConfig); + void reconfigure(AbstractConfig oldConfig, AbstractConfig newConfig); } diff --git a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java index fc6906b96d3..87bf18a412f 100644 --- a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java @@ -83,6 +83,4 @@ public abstract class AbstractKafkaConfig extends AbstractConfig { public int backgroundThreads() { return getInt(ServerConfigs.BACKGROUND_THREADS_CONFIG); } - - public abstract TransactionLogConfig transactionLogConfig(); } diff --git a/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java b/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java index 4158194f000..de9289e09a7 100644 --- a/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.server.config; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.config.BrokerReconfigurable; import org.apache.kafka.coordinator.transaction.TransactionLogConfig; import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; @@ -39,16 +41,16 @@ public class DynamicProducerStateManagerConfig implements BrokerReconfigurable { } @Override - public void validateReconfiguration(AbstractKafkaConfig newConfig) { - TransactionLogConfig transactionLogConfig = newConfig.transactionLogConfig(); + public void validateReconfiguration(AbstractConfig newConfig) { + TransactionLogConfig transactionLogConfig = new TransactionLogConfig(newConfig); if (transactionLogConfig.producerIdExpirationMs() < 0) throw new ConfigException(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG + "cannot be less than 0, current value is " + producerStateManagerConfig.producerIdExpirationMs() + ", and new value is " + transactionLogConfig.producerIdExpirationMs()); } @Override - public void reconfigure(AbstractKafkaConfig oldConfig, AbstractKafkaConfig newConfig) { - TransactionLogConfig transactionLogConfig = newConfig.transactionLogConfig(); + public void reconfigure(AbstractConfig oldConfig, AbstractConfig newConfig) { + TransactionLogConfig transactionLogConfig = new TransactionLogConfig(newConfig); if (producerStateManagerConfig.producerIdExpirationMs() != transactionLogConfig.producerIdExpirationMs()) { log.info("Reconfigure {} from {} to {}", TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG,