This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f59d829381a KAFKA-15853 Move TransactionLogConfig and 
TransactionStateManagerConfig getters out of KafkaConfig (#16665)
f59d829381a is described below

commit f59d829381a7021971b0b2835fbbacb4abd996a5
Author: Omnia Ibrahim <[email protected]>
AuthorDate: Tue Sep 3 11:24:12 2024 +0100

    KAFKA-15853 Move TransactionLogConfig and TransactionStateManagerConfig 
getters out of KafkaConfig (#16665)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 checkstyle/import-control.xml                      |  2 +-
 .../transaction/TransactionCoordinator.scala       | 18 +++---
 .../transaction/TransactionStateManager.scala      | 26 ++++----
 core/src/main/scala/kafka/log/LogManager.scala     |  6 +-
 .../kafka/server/AutoTopicCreationManager.scala    |  4 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 20 +++---
 core/src/main/scala/kafka/server/KafkaConfig.scala | 23 ++-----
 core/src/main/scala/kafka/server/KafkaServer.scala |  2 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  2 +-
 .../server/metadata/BrokerMetadataPublisher.scala  |  2 +-
 .../admin/AdminFenceProducersIntegrationTest.scala | 10 +--
 .../api/AbstractAuthorizerIntegrationTest.scala    |  8 +--
 .../kafka/api/GroupAuthorizerIntegrationTest.scala |  8 +--
 .../kafka/api/ProducerIdExpirationTest.scala       | 20 +++---
 .../SaslClientsWithInvalidCredentialsTest.scala    |  6 +-
 .../kafka/api/TransactionsBounceTest.scala         |  6 +-
 .../kafka/api/TransactionsExpirationTest.scala     | 18 +++---
 .../integration/kafka/api/TransactionsTest.scala   | 10 +--
 .../api/TransactionsWithMaxInFlightOneTest.scala   | 10 +--
 .../server/DynamicBrokerReconfigurationTest.scala  |  6 +-
 .../src/test/scala/other/kafka/StressTestLog.scala |  6 +-
 .../scala/other/kafka/TestLinearWriteSpeed.scala   |  6 +-
 .../unit/kafka/cluster/PartitionLockTest.scala     |  4 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  4 +-
 .../transaction/TransactionCoordinatorTest.scala   | 12 ++--
 .../log/AbstractLogCleanerIntegrationTest.scala    |  6 +-
 .../unit/kafka/log/BrokerCompressionTest.scala     |  6 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala     | 10 +--
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |  8 +--
 .../scala/unit/kafka/log/LogConcurrencyTest.scala  |  6 +-
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  | 12 ++--
 .../test/scala/unit/kafka/log/LogManagerTest.scala | 10 +--
 .../test/scala/unit/kafka/log/LogSegmentTest.scala |  4 +-
 .../test/scala/unit/kafka/log/LogTestUtils.scala   |  6 +-
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala |  6 +-
 .../server/AutoTopicCreationManagerTest.scala      |  8 +--
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 12 ++--
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 16 ++---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 10 +--
 .../unit/kafka/tools/DumpLogSegmentsTest.scala     |  6 +-
 .../scala/unit/kafka/utils/SchedulerTest.scala     |  6 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  6 +-
 .../kafka/server/config/AbstractKafkaConfig.java   |  8 +--
 .../internals/log/ProducerStateManagerTest.java    |  2 +-
 .../integration/utils/EmbeddedKafkaCluster.java    |  4 +-
 ...onLogConfigs.java => TransactionLogConfig.java} | 71 +++++++++++++++++++---
 ...igs.java => TransactionStateManagerConfig.java} | 37 +++++++++--
 47 files changed, 284 insertions(+), 215 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 285163816f2..f7d7ef798bd 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -406,7 +406,7 @@
       <allow pkg="org.apache.kafka.tools" />
       <allow pkg="org.apache.kafka.server.config" />
       <allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
-      <allow 
class="org.apache.kafka.coordinator.transaction.TransactionLogConfigs" />
+      <allow 
class="org.apache.kafka.coordinator.transaction.TransactionLogConfig" />
       <allow pkg="org.apache.kafka.coordinator.group" />
       <allow pkg="org.apache.kafka.network" />
     </subpackage>
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index b3d5d9ef480..586ba1b4f98 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -44,15 +44,15 @@ object TransactionCoordinator {
             metadataCache: MetadataCache,
             time: Time): TransactionCoordinator = {
 
-    val txnConfig = TransactionConfig(config.transactionalIdExpirationMs,
-      config.transactionMaxTimeoutMs,
-      config.transactionTopicPartitions,
-      config.transactionTopicReplicationFactor,
-      config.transactionTopicSegmentBytes,
-      config.transactionsLoadBufferSize,
-      config.transactionTopicMinISR,
-      config.transactionAbortTimedOutTransactionCleanupIntervalMs,
-      config.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
+    val txnConfig = 
TransactionConfig(config.transactionStateManagerConfig.transactionalIdExpirationMs,
+      config.transactionStateManagerConfig.transactionMaxTimeoutMs,
+      config.transactionLogConfig.transactionTopicPartitions,
+      config.transactionLogConfig.transactionTopicReplicationFactor,
+      config.transactionLogConfig.transactionTopicSegmentBytes,
+      config.transactionLogConfig.transactionsLoadBufferSize,
+      config.transactionLogConfig.transactionTopicMinISR,
+      
config.transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs,
+      
config.transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
       config.requestTimeoutMs)
 
     val txnStateManager = new TransactionStateManager(config.brokerId, 
scheduler, replicaManager, metadataCache, txnConfig,
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 2894bbdd94b..3daf966d8c3 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -35,7 +35,7 @@ import 
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.TransactionResult
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, 
TransactionStateManagerConfigs}
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionStateManagerConfig}
 import org.apache.kafka.server.common.TransactionVersion
 import org.apache.kafka.server.config.ServerConfigs
 import org.apache.kafka.server.record.BrokerCompressionType
@@ -92,13 +92,13 @@ class TransactionStateManager(brokerId: Int,
   @volatile private var transactionTopicPartitionCount: Int = _
 
   /** setup metrics*/
-  private val partitionLoadSensor = 
metrics.sensor(TransactionStateManagerConfigs.LOAD_TIME_SENSOR)
+  private val partitionLoadSensor = 
metrics.sensor(TransactionStateManagerConfig.LOAD_TIME_SENSOR)
 
   partitionLoadSensor.add(metrics.metricName("partition-load-time-max",
-    TransactionStateManagerConfigs.METRICS_GROUP,
+    TransactionStateManagerConfig.METRICS_GROUP,
     "The max time it took to load the partitions in the last 30sec"), new 
Max())
   partitionLoadSensor.add(metrics.metricName("partition-load-time-avg",
-    TransactionStateManagerConfigs.METRICS_GROUP,
+    TransactionStateManagerConfig.METRICS_GROUP,
     "The avg time it took to load the partitions in the last 30sec"), new 
Avg())
 
   private[transaction] def usesFlexibleRecords(): Boolean = {
@@ -809,15 +809,15 @@ private[transaction] case class 
TxnMetadataCacheEntry(coordinatorEpoch: Int,
 private[transaction] case class 
CoordinatorEpochAndTxnMetadata(coordinatorEpoch: Int,
                                                                
transactionMetadata: TransactionMetadata)
 
-private[transaction] case class TransactionConfig(transactionalIdExpirationMs: 
Int = TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT,
-                                                  transactionMaxTimeoutMs: Int 
= TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT,
-                                                  transactionLogNumPartitions: 
Int = TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT,
-                                                  
transactionLogReplicationFactor: Short = 
TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT,
-                                                  transactionLogSegmentBytes: 
Int = TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT,
-                                                  
transactionLogLoadBufferSize: Int = 
TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT,
-                                                  
transactionLogMinInsyncReplicas: Int = 
TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT,
-                                                  
abortTimedOutTransactionsIntervalMs: Int = 
TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT,
-                                                  
removeExpiredTransactionalIdsIntervalMs: Int = 
TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT,
+private[transaction] case class TransactionConfig(transactionalIdExpirationMs: 
Int = TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT,
+                                                  transactionMaxTimeoutMs: Int 
= TransactionStateManagerConfig.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT,
+                                                  transactionLogNumPartitions: 
Int = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT,
+                                                  
transactionLogReplicationFactor: Short = 
TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT,
+                                                  transactionLogSegmentBytes: 
Int = TransactionLogConfig.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT,
+                                                  
transactionLogLoadBufferSize: Int = 
TransactionLogConfig.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT,
+                                                  
transactionLogMinInsyncReplicas: Int = 
TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT,
+                                                  
abortTimedOutTransactionsIntervalMs: Int = 
TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT,
+                                                  
removeExpiredTransactionalIdsIntervalMs: Int = 
TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT,
                                                   requestTimeoutMs: Int = 
ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT)
 
 case class TransactionalIdAndProducerIdEpoch(transactionalId: String, 
producerId: Long, producerEpoch: Short) {
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index e9d782bba9d..3a447e22ef8 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -1588,9 +1588,9 @@ object LogManager {
       flushRecoveryOffsetCheckpointMs = 
config.logFlushOffsetCheckpointIntervalMs,
       flushStartOffsetCheckpointMs = 
config.logFlushStartOffsetCheckpointIntervalMs,
       retentionCheckMs = config.logCleanupIntervalMs,
-      maxTransactionTimeoutMs = config.transactionMaxTimeoutMs,
-      producerStateManagerConfig = new 
ProducerStateManagerConfig(config.producerIdExpirationMs, 
config.transactionPartitionVerificationEnable),
-      producerIdExpirationCheckIntervalMs = 
config.producerIdExpirationCheckIntervalMs,
+      maxTransactionTimeoutMs = 
config.transactionStateManagerConfig.transactionMaxTimeoutMs,
+      producerStateManagerConfig = new 
ProducerStateManagerConfig(config.transactionLogConfig.producerIdExpirationMs, 
config.transactionLogConfig.transactionPartitionVerificationEnable),
+      producerIdExpirationCheckIntervalMs = 
config.transactionLogConfig.producerIdExpirationCheckIntervalMs,
       scheduler = kafkaScheduler,
       brokerTopicStats = brokerTopicStats,
       logDirFailureChannel = logDirFailureChannel,
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala 
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index a5c0146f84c..d0efdb910b8 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -240,8 +240,8 @@ class DefaultAutoTopicCreationManager(
       case TRANSACTION_STATE_TOPIC_NAME =>
         new CreatableTopic()
           .setName(topic)
-          .setNumPartitions(config.transactionTopicPartitions)
-          .setReplicationFactor(config.transactionTopicReplicationFactor)
+          
.setNumPartitions(config.transactionLogConfig.transactionTopicPartitions)
+          
.setReplicationFactor(config.transactionLogConfig.transactionTopicReplicationFactor)
           .setConfigs(convertToTopicConfigCollections(
             txnCoordinator.transactionTopicConfigs))
       case topicName =>
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 23c99e5bcb3..a1d03cf9bce 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
 import org.apache.kafka.common.security.authenticator.LoginManager
 import org.apache.kafka.common.utils.{ConfigUtils, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.security.PasswordEncoder
 import org.apache.kafka.server.ProcessRole
@@ -87,7 +87,7 @@ import scala.jdk.CollectionConverters._
 object DynamicBrokerConfig {
 
   private[server] val DynamicSecurityConfigs = 
SslConfigs.RECONFIGURABLE_CONFIGS.asScala
-  private[server] val DynamicProducerStateManagerConfig = 
Set(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, 
TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG)
+  private[server] val DynamicProducerStateManagerConfig = 
Set(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, 
TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG)
 
   val AllDynamicConfigs = DynamicSecurityConfigs ++
     LogCleaner.ReconfigurableConfigs ++
@@ -1128,19 +1128,19 @@ class DynamicListenerConfig(server: KafkaBroker) 
extends BrokerReconfigurable wi
 
 class DynamicProducerStateManagerConfig(val producerStateManagerConfig: 
ProducerStateManagerConfig) extends BrokerReconfigurable with Logging {
   def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
-    if (producerStateManagerConfig.producerIdExpirationMs != 
newConfig.producerIdExpirationMs) {
-      info(s"Reconfigure 
${TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG} from 
${producerStateManagerConfig.producerIdExpirationMs} to 
${newConfig.producerIdExpirationMs}")
-      
producerStateManagerConfig.setProducerIdExpirationMs(newConfig.producerIdExpirationMs)
+    if (producerStateManagerConfig.producerIdExpirationMs != 
newConfig.transactionLogConfig.producerIdExpirationMs) {
+      info(s"Reconfigure 
${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG} from 
${producerStateManagerConfig.producerIdExpirationMs} to 
${newConfig.transactionLogConfig.producerIdExpirationMs}")
+      
producerStateManagerConfig.setProducerIdExpirationMs(newConfig.transactionLogConfig.producerIdExpirationMs)
     }
-    if (producerStateManagerConfig.transactionVerificationEnabled != 
newConfig.transactionPartitionVerificationEnable) {
-      info(s"Reconfigure 
${TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG} from 
${producerStateManagerConfig.transactionVerificationEnabled} to 
${newConfig.transactionPartitionVerificationEnable}")
-      
producerStateManagerConfig.setTransactionVerificationEnabled(newConfig.transactionPartitionVerificationEnable)
+    if (producerStateManagerConfig.transactionVerificationEnabled != 
newConfig.transactionLogConfig.transactionPartitionVerificationEnable) {
+      info(s"Reconfigure 
${TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG} from 
${producerStateManagerConfig.transactionVerificationEnabled} to 
${newConfig.transactionLogConfig.transactionPartitionVerificationEnable}")
+      
producerStateManagerConfig.setTransactionVerificationEnabled(newConfig.transactionLogConfig.transactionPartitionVerificationEnable)
     }
   }
 
   def validateReconfiguration(newConfig: KafkaConfig): Unit = {
-    if (newConfig.producerIdExpirationMs < 0)
-      throw new 
ConfigException(s"${TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG} 
cannot be less than 0, current value is 
${producerStateManagerConfig.producerIdExpirationMs}, and new value is 
${newConfig.producerIdExpirationMs}")
+    if (newConfig.transactionLogConfig.producerIdExpirationMs < 0)
+      throw new 
ConfigException(s"${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG} 
cannot be less than 0, current value is 
${producerStateManagerConfig.producerIdExpirationMs}, and new value is 
${newConfig.transactionLogConfig.producerIdExpirationMs}")
   }
 
   override def reconfigurableConfigs: Set[String] = 
DynamicProducerStateManagerConfig
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 5f099b31724..46bebf3c116 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -35,7 +35,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.coordinator.group.Group.GroupType
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, 
TransactionStateManagerConfigs}
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionStateManagerConfig}
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.security.authorizer.AuthorizerUtils
@@ -238,6 +238,11 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   private val _shareGroupConfig = new ShareGroupConfig(this)
   def shareGroupConfig: ShareGroupConfig = _shareGroupConfig
 
+  private val _transactionLogConfig = new TransactionLogConfig(this)
+  private val _transactionStateManagerConfig = new 
TransactionStateManagerConfig(this)
+  def transactionLogConfig: TransactionLogConfig = _transactionLogConfig
+  def transactionStateManagerConfig: TransactionStateManagerConfig = 
_transactionStateManagerConfig
+
   private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: 
String): Boolean = {
     // Use the system property if it exists and the Kafka config value was 
defaulted rather than actually provided
     // Need to translate any system property value from true/false (String) to 
true/false (Boolean)
@@ -596,22 +601,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     protocols
   }
 
-  /** ********* Transaction management configuration ***********/
-  val transactionalIdExpirationMs = 
getInt(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG)
-  val transactionMaxTimeoutMs = 
getInt(TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG)
-  val transactionTopicMinISR = 
getInt(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG)
-  val transactionsLoadBufferSize = 
getInt(TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG)
-  val transactionTopicReplicationFactor = 
getShort(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG)
-  val transactionTopicPartitions = 
getInt(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG)
-  val transactionTopicSegmentBytes = 
getInt(TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG)
-  val transactionAbortTimedOutTransactionCleanupIntervalMs = 
getInt(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG)
-  val transactionRemoveExpiredTransactionalIdCleanupIntervalMs = 
getInt(TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG)
-
-  def transactionPartitionVerificationEnable = 
getBoolean(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG)
-
-  def producerIdExpirationMs = 
getInt(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG)
-  val producerIdExpirationCheckIntervalMs = 
getInt(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG)
-
   /** ********* Metric Configuration **************/
   val metricNumSamples = getInt(MetricConfigs.METRIC_NUM_SAMPLES_CONFIG)
   val metricSampleWindowMs = 
getLong(MetricConfigs.METRIC_SAMPLE_WINDOW_MS_CONFIG)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 7edd80467ed..a34c075822f 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -525,7 +525,7 @@ class KafkaServer(
         transactionCoordinator = TransactionCoordinator(config, 
replicaManager, new KafkaScheduler(1, true, "transaction-log-manager-"),
           () => producerIdManager, metrics, metadataCache, Time.SYSTEM)
         transactionCoordinator.startup(
-          () => 
zkClient.getTopicPartitionCount(Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionTopicPartitions))
+          () => 
zkClient.getTopicPartitionCount(Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionLogConfig.transactionTopicPartitions))
 
         /* start auto topic creation manager */
         this.autoTopicCreationManager = AutoTopicCreationManager(
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 09c3b1dc4d4..e9c114567d9 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1053,7 +1053,7 @@ class ReplicaManager(val config: KafkaConfig,
   ): Unit = {
     // Skip verification if the request is not transactional or transaction 
verification is disabled.
     if (transactionalId == null ||
-      !config.transactionPartitionVerificationEnable
+      !config.transactionLogConfig.transactionPartitionVerificationEnable
       || addPartitionsToTxnManager.isEmpty
     ) {
       callback((Map.empty[TopicPartition, Errors], Map.empty[TopicPartition, 
VerificationGuard]))
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 680b8833581..386d355a49c 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -308,7 +308,7 @@ class BrokerMetadataPublisher(
     try {
       // Start the transaction coordinator.
       txnCoordinator.startup(() => metadataCache.numPartitions(
-        
Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionTopicPartitions))
+        
Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionLogConfig.transactionTopicPartitions))
     } catch {
       case t: Throwable => fatalFaultHandler.handleFault("Error starting 
TransactionCoordinator", t)
     }
diff --git 
a/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala
index 5754dd12787..2e2847b3f4f 100644
--- 
a/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
 import org.apache.kafka.common.errors.{InvalidProducerEpochException, 
ProducerFencedException, TimeoutException}
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, 
TransactionStateManagerConfigs}
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionStateManagerConfig}
 import org.apache.kafka.server.config.ServerLogConfigs
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, TestInfo, Timeout}
@@ -63,10 +63,10 @@ class AdminFenceProducersIntegrationTest extends 
IntegrationTestHarness {
     val props = new Properties()
     props.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, 
false.toString)
     // Set a smaller value for the number of partitions for speed
-    props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
1.toString)
-    
props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 
1.toString)
-    props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 
1.toString)
-    
props.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
 "2000")
+    props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
1.toString)
+    
props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 
1.toString)
+    props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 
1.toString)
+    
props.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
 "2000")
     props
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
index 5e181b9e823..42addf1dd54 100644
--- 
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
@@ -29,7 +29,7 @@ import 
org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC, TRA
 import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal}
 import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.metadata.authorizer.StandardAuthorizer
 import org.apache.kafka.server.config.ServerConfigs
 import org.junit.jupiter.api.{BeforeEach, TestInfo}
@@ -114,9 +114,9 @@ class AbstractAuthorizerIntegrationTest extends 
BaseRequestTest {
 
     properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
     
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
"1")
-    properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
"1")
-    
properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 "1")
-    properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 
"1")
+    properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
"1")
+    
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 "1")
+    properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1")
     properties.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true")
     properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, 
classOf[PrincipalBuilder].getName)
   }
diff --git 
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
index e053966c079..7723c38ac5e 100644
--- 
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -28,8 +28,8 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.resource.{PatternType, Resource, 
ResourcePattern, ResourceType}
 import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal}
 import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.metadata.authorizer.StandardAuthorizer
 import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
 import org.apache.kafka.server.config.ServerConfigs
@@ -91,9 +91,9 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
 
     properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
     
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
"1")
-    properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
"1")
-    
properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 "1")
-    properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 
"1")
+    properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
"1")
+    
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 "1")
+    properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1")
     properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, 
classOf[GroupPrincipalBuilder].getName)
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
index f9e5d791695..69184a5e158 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.{InvalidPidMappingException, 
TransactionalIdNotFoundException}
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, 
TransactionStateManagerConfigs}
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionStateManagerConfig}
 import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, 
ServerLogConfigs}
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
@@ -204,7 +204,7 @@ class ProducerIdExpirationTest extends 
KafkaServerTestHarness {
   }
 
   private def producerIdExpirationConfig(configValue: String): 
util.Map[ConfigResource, util.Collection[AlterConfigOp]] = {
-    val producerIdCfg = new 
ConfigEntry(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, configValue)
+    val producerIdCfg = new 
ConfigEntry(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, configValue)
     val configs = Collections.singletonList(new AlterConfigOp(producerIdCfg, 
AlterConfigOp.OpType.SET))
     Collections.singletonMap(configResource, configs)
   }
@@ -230,18 +230,18 @@ class ProducerIdExpirationTest extends 
KafkaServerTestHarness {
     // Set a smaller value for the number of partitions for the 
__consumer_offsets topic
     // so that the creation of that topic/partition(s) and subsequent leader 
assignment doesn't take relatively long.
     serverProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 
1.toString)
-    
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
3.toString)
-    
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 2.toString)
-    serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 
2.toString)
+    serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
3.toString)
+    
serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 2.toString)
+    serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 
2.toString)
     serverProps.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, 
true.toString)
     serverProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, 
false.toString)
     serverProps.put(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, 
false.toString)
     
serverProps.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
"0")
-    
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
 "200")
-    
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG,
 "5000")
-    
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG,
 "500")
-    serverProps.put(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, 
"10000")
-    
serverProps.put(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG,
 "500")
+    
serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
 "200")
+    
serverProps.put(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG,
 "5000")
+    
serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG,
 "500")
+    serverProps.put(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, 
"10000")
+    
serverProps.put(TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG,
 "500")
     serverProps
   }
 }
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index f8440226caa..7190c939b2f 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -27,8 +27,8 @@ import org.junit.jupiter.api.Assertions._
 import kafka.utils.TestUtils
 import kafka.zk.ConfigEntityChangeNotificationZNode
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
@@ -43,8 +43,8 @@ class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
   val brokerCount = 1
 
   
this.serverConfig.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
 "1")
-  
this.serverConfig.setProperty(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 "1")
-  
this.serverConfig.setProperty(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
 "1")
+  
this.serverConfig.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 "1")
+  
this.serverConfig.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
 "1")
   this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
 
   val topic = "topic"
diff --git 
a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
index 3cb508958a4..ee47dcae05f 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -24,8 +24,8 @@ import org.apache.kafka.clients.consumer.{Consumer, 
ConsumerConfig}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.util.ShutdownableThread
 import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, 
ServerLogConfigs}
 import org.junit.jupiter.api.Assertions._
@@ -57,8 +57,8 @@ class TransactionsBounceTest extends IntegrationTestHarness {
   overridingProps.put(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, 2.toString)
   
overridingProps.put(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
"10") // set small enough session timeout
   
overridingProps.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
 "0")
-  
overridingProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
1.toString)
-  
overridingProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 3.toString)
+  
overridingProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
1.toString)
+  
overridingProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 3.toString)
 
   // This is the one of the few tests we currently allow to preallocate ports, 
despite the fact that this can result in transient
   // failures due to ports getting reused. We can't use random ports because 
of bad behavior that can result from bouncing
diff --git 
a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
index a31385224fd..6063d522992 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.clients.consumer.Consumer
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{InvalidPidMappingException, 
TransactionalIdNotFoundException}
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, 
TransactionStateManagerConfigs}
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionStateManagerConfig}
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, 
ServerLogConfigs}
 import org.junit.jupiter.api.Assertions.assertEquals
@@ -205,18 +205,18 @@ class TransactionsExpirationTest extends 
KafkaServerTestHarness {
     // Set a smaller value for the number of partitions for the 
__consumer_offsets topic
     // so that the creation of that topic/partition(s) and subsequent leader 
assignment doesn't take relatively long.
     serverProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 
1.toString)
-    
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
3.toString)
-    
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 2.toString)
-    serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 
2.toString)
+    serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
3.toString)
+    
serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 2.toString)
+    serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 
2.toString)
     serverProps.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, 
true.toString)
     serverProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, 
false.toString)
     serverProps.put(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, 
false.toString)
     
serverProps.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
"0")
-    
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
 "200")
-    
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG,
 "10000")
-    
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG,
 "500")
-    serverProps.put(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, 
"5000")
-    
serverProps.put(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG,
 "500")
+    
serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
 "200")
+    
serverProps.put(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG,
 "10000")
+    
serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG,
 "500")
+    serverProps.put(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, 
"5000")
+    
serverProps.put(TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG,
 "500")
     serverProps
   }
 }
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 9c8b93012a4..0a64ec9ed51 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, 
ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{InvalidProducerEpochException, 
ProducerFencedException, TimeoutException}
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, 
TransactionStateManagerConfigs}
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionStateManagerConfig}
 import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, 
ServerLogConfigs}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
@@ -63,14 +63,14 @@ class TransactionsTest extends IntegrationTestHarness {
     props.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, 
false.toString)
      // Set a smaller value for the number of partitions for the 
__consumer_offsets topic + // so that the creation of that topic/partition(s) 
and subsequent leader assignment doesn't take relatively long
     props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 
1.toString)
-    props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
3.toString)
-    
props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 
2.toString)
-    props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 
2.toString)
+    props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
3.toString)
+    
props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 
2.toString)
+    props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 
2.toString)
     props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, true.toString)
     props.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, 
false.toString)
     props.put(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, 
false.toString)
     props.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
"0")
-    
props.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
 "200")
+    
props.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
 "200")
     props
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
 
b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
index ae73cde5e84..41674852549 100644
--- 
a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
@@ -25,7 +25,7 @@ import kafka.utils.TestUtils.consumeRecords
 import org.apache.kafka.clients.consumer.Consumer
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, 
TransactionStateManagerConfigs}
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionStateManagerConfig}
 import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, 
ServerLogConfigs}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
@@ -107,14 +107,14 @@ class TransactionsWithMaxInFlightOneTest extends 
KafkaServerTestHarness {
     serverProps.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, 
false.toString)
     serverProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 
1.toString)
     
serverProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
1.toString)
-    
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
1.toString)
-    
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 1.toString)
-    serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 
1.toString)
+    serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
1.toString)
+    
serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 1.toString)
+    serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 
1.toString)
     serverProps.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, 
true.toString)
     serverProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, 
false.toString)
     serverProps.put(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, 
false.toString)
     
serverProps.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
"0")
-    
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
 "200")
+    
serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
 "200")
     serverProps
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 01aef960dd4..03a56d9a9e0 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -61,7 +61,7 @@ import org.apache.kafka.common.requests.MetadataRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.ScramCredential
 import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
 import org.apache.kafka.server.config.{ConfigType, ReplicationConfigs, 
ServerConfigs, ServerLogConfigs, ZkConfigs}
@@ -1274,7 +1274,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     // Dynamically turn verification off.
     val configPrefix = listenerPrefix(SecureExternal)
     val updatedProps = securityProps(sslProperties1, KEYSTORE_PROPS, 
configPrefix)
-    
updatedProps.put(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
 "false")
+    
updatedProps.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
 "false")
     alterConfigsUsingConfigCommand(updatedProps)
     verifyConfiguration(false)
 
@@ -1286,7 +1286,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     verifyConfiguration(false)
 
     // Turn verification back on.
-    
updatedProps.put(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
 "true")
+    
updatedProps.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
 "true")
     alterConfigsUsingConfigCommand(updatedProps)
     verifyConfiguration(true)
   }
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala 
b/core/src/test/scala/other/kafka/StressTestLog.scala
index 78fda569a36..a604884a06e 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -25,7 +25,7 @@ import 
org.apache.kafka.clients.consumer.OffsetOutOfRangeException
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.record.FileRecords
 import org.apache.kafka.common.utils.{Exit, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.log.{FetchIsolation, LogConfig, 
LogDirFailureChannel, ProducerStateManagerConfig}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -52,8 +52,8 @@ object StressTestLog {
       scheduler = time.scheduler,
       time = time,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
-      producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
-      producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+      producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+      producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
       brokerTopicStats = new BrokerTopicStats,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala 
b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 0811cec64c5..adef2c63809 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.compress.{Compression, 
GzipCompression, Lz4Compre
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{Exit, Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.util.{KafkaScheduler, Scheduler}
 import org.apache.kafka.server.util.CommandLineUtils
 import org.apache.kafka.storage.internals.log.{LogConfig, 
LogDirFailureChannel, ProducerStateManagerConfig}
@@ -235,8 +235,8 @@ object TestLinearWriteSpeed {
       brokerTopicStats = new BrokerTopicStats,
       time = Time.SYSTEM,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
-      producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
-      producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+      producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+      producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 1f70ccfccd3..9711913036b 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.record.{MemoryRecords, 
SimpleRecord}
 import org.apache.kafka.common.requests.FetchRequest
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.metadata.LeaderAndIsr
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.config.ReplicationConfigs
@@ -304,7 +304,7 @@ class PartitionLockTest extends Logging {
         val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
           log.dir, log.topicPartition, logDirFailureChannel, 
log.config.recordVersion, "", None, mockTime.scheduler)
         val maxTransactionTimeout = 5 * 60 * 1000
-        val producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false)
+        val producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false)
         val producerStateManager = new ProducerStateManager(
           log.topicPartition,
           log.dir,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 1d5e2677c67..27e343d5f03 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -53,7 +53,7 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.replica.ClientMetadata
 import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
@@ -439,7 +439,7 @@ class PartitionTest extends AbstractPartitionTest {
         val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
           log.dir, log.topicPartition, logDirFailureChannel, 
log.config.recordVersion, "", None, time.scheduler)
         val maxTransactionTimeoutMs = 5 * 60 * 1000
-        val producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 true)
+        val producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 true)
         val producerStateManager = new ProducerStateManager(
           log.topicPartition,
           log.dir,
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index 8ba9b1a8f79..584a49bbc2c 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse, 
TransactionResult}
 import org.apache.kafka.common.utils.{LogContext, MockTime, ProducerIdAndEpoch}
-import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs
+import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig
 import org.apache.kafka.server.util.MockScheduler
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
@@ -1004,7 +1004,7 @@ class TransactionCoordinatorTest {
       .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata))))
 
     val expectedTransition = TxnTransitMetadata(producerId, producerId, 
(producerEpoch + 1).toShort, RecordBatch.NO_PRODUCER_EPOCH,
-      txnTimeoutMs, PrepareAbort, partitions.toSet, now, now + 
TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
+      txnTimeoutMs, PrepareAbort, partitions.toSet, now, now + 
TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
 
     
when(transactionManager.appendTransactionToLog(ArgumentMatchers.eq(transactionalId),
       ArgumentMatchers.eq(coordinatorEpoch),
@@ -1015,7 +1015,7 @@ class TransactionCoordinatorTest {
     ).thenAnswer(_ => {})
 
     coordinator.startup(() => transactionStatePartitionCount, false)
-    
time.sleep(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
+    
time.sleep(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
     scheduler.tick()
     verify(transactionManager).timedOutTransactions()
     verify(transactionManager, 
times(2)).getTransactionState(ArgumentMatchers.eq(transactionalId))
@@ -1064,7 +1064,7 @@ class TransactionCoordinatorTest {
       .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
metadata))))
 
     coordinator.startup(() => transactionStatePartitionCount, false)
-    
time.sleep(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
+    
time.sleep(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
     scheduler.tick()
     verify(transactionManager).timedOutTransactions()
     
verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId))
@@ -1088,7 +1088,7 @@ class TransactionCoordinatorTest {
 
     val bumpedEpoch = (producerEpoch + 1).toShort
     val expectedTransition = TxnTransitMetadata(producerId, producerId, 
bumpedEpoch, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs,
-      PrepareAbort, partitions.toSet, now, now + 
TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
+      PrepareAbort, partitions.toSet, now, now + 
TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
 
     
when(transactionManager.appendTransactionToLog(ArgumentMatchers.eq(transactionalId),
       ArgumentMatchers.eq(coordinatorEpoch),
@@ -1099,7 +1099,7 @@ class TransactionCoordinatorTest {
     ).thenAnswer(_ => 
capturedErrorsCallback.getValue.apply(Errors.NOT_ENOUGH_REPLICAS))
 
     coordinator.startup(() => transactionStatePartitionCount, false)
-    
time.sleep(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
+    
time.sleep(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
     scheduler.tick()
 
     verify(transactionManager).timedOutTransactions()
diff --git 
a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index ff94377b2f1..87470168527 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, 
LogDirFailureChannel, ProducerStateManagerConfig}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -114,8 +114,8 @@ abstract class AbstractLogCleanerIntegrationTest {
         time = time,
         brokerTopicStats = new BrokerTopicStats,
         maxTransactionTimeoutMs = 5 * 60 * 1000,
-        producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
-        producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+        producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+        producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
         logDirFailureChannel = new LogDirFailureChannel(10),
         topicId = None,
         keepPartitionMetadataFile = true)
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala 
b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 1495d688d5c..36a08b875f4 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
RecordBatch, SimpleRecord}
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.record.BrokerCompressionType
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.log.{FetchIsolation, LogConfig, 
LogDirFailureChannel, ProducerStateManagerConfig}
@@ -65,8 +65,8 @@ class BrokerCompressionTest {
       time = time,
       brokerTopicStats = new BrokerTopicStats,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
-      producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
-      producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+      producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+      producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 978b856a9fa..44a8a90a39c 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
LogDirFailureChannel, LogSegment, LogSegments, LogStartOffsetIncrementReason, 
ProducerStateManager, ProducerStateManagerConfig}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -56,7 +56,7 @@ class LogCleanerManagerTest extends Logging {
   val logConfig: LogConfig = new LogConfig(logProps)
   val time = new MockTime(1400000000000L, 1000L)  // Tue May 13 16:53:20 UTC 
2014 for `currentTimeMs`
   val offset = 999
-  val producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false)
+  val producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false)
 
   val cleanerCheckpoints: mutable.Map[TopicPartition, Long] = 
mutable.Map[TopicPartition, Long]()
 
@@ -107,7 +107,7 @@ class LogCleanerManagerTest extends Logging {
     val logDirFailureChannel = new LogDirFailureChannel(10)
     val config = createLowRetentionLogConfig(logSegmentSize, 
TopicConfig.CLEANUP_POLICY_COMPACT)
     val maxTransactionTimeoutMs = 5 * 60 * 1000
-    val producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
+    val producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
     val segments = new LogSegments(tp)
     val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
       tpDir, topicPartition, logDirFailureChannel, config.recordVersion, "", 
None, time.scheduler)
@@ -817,7 +817,7 @@ class LogCleanerManagerTest extends Logging {
       brokerTopicStats = new BrokerTopicStats,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
       producerStateManagerConfig = producerStateManagerConfig,
-      producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+      producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true)
@@ -871,7 +871,7 @@ class LogCleanerManagerTest extends Logging {
       brokerTopicStats = new BrokerTopicStats,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
       producerStateManagerConfig = producerStateManagerConfig,
-      producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+      producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index f8350cf7427..8db79e8186b 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, 
LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, 
ProducerStateManager, ProducerStateManagerConfig}
@@ -64,7 +64,7 @@ class LogCleanerTest extends Logging {
   val throttler = new Throttler(Double.MaxValue, Long.MaxValue, "throttler", 
"entries", time)
   val tombstoneRetentionMs = 86400000
   val largeTimestamp = Long.MaxValue - tombstoneRetentionMs - 1
-  val producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false)
+  val producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false)
 
   @AfterEach
   def teardown(): Unit = {
@@ -187,7 +187,7 @@ class LogCleanerTest extends Logging {
     val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
     val logDirFailureChannel = new LogDirFailureChannel(10)
     val maxTransactionTimeoutMs = 5 * 60 * 1000
-    val producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
+    val producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
     val logSegments = new LogSegments(topicPartition)
     val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
       dir, topicPartition, logDirFailureChannel, config.recordVersion, "", 
None, time.scheduler)
@@ -2057,7 +2057,7 @@ class LogCleanerTest extends Logging {
       brokerTopicStats = new BrokerTopicStats,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
       producerStateManagerConfig = producerStateManagerConfig,
-      producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+      producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala 
b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
index 0793768bfbd..126ebdd4e69 100644
--- a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
@@ -23,7 +23,7 @@ import kafka.utils.TestUtils
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.record.SimpleRecord
 import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.util.KafkaScheduler
 import org.apache.kafka.storage.internals.log.{FetchIsolation, LogConfig, 
LogDirFailureChannel, ProducerStateManagerConfig}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -152,8 +152,8 @@ class LogConcurrencyTest {
       brokerTopicStats = brokerTopicStats,
       time = Time.SYSTEM,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
-      producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
-      producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+      producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+      producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala 
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index 5c0bcd3b4d7..10b7ba4a507 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, 
MemoryRecords, RecordBatch, RecordVersion, SimpleRecord, TimestampType}
 import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0
 import org.apache.kafka.server.util.{MockTime, Scheduler}
@@ -60,8 +60,8 @@ class LogLoaderTest {
   var config: KafkaConfig = _
   val brokerTopicStats = new BrokerTopicStats
   val maxTransactionTimeoutMs: Int = 5 * 60 * 1000
-  val producerStateManagerConfig: ProducerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false)
-  val producerIdExpirationCheckIntervalMs: Int = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
+  val producerStateManagerConfig: ProducerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false)
+  val producerIdExpirationCheckIntervalMs: Int = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
   val tmpDir = TestUtils.tempDir()
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
   var logsToClose: Seq[UnifiedLog] = Seq()
@@ -103,7 +103,7 @@ class LogLoaderTest {
     val logDirFailureChannel = new LogDirFailureChannel(logDirs.size)
 
     val maxTransactionTimeoutMs = 5 * 60 * 1000
-    val producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
+    val producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
 
     // Create a LogManager with some overridden methods to facilitate 
interception of clean shutdown
     // flag and to inject an error
@@ -353,7 +353,7 @@ class LogLoaderTest {
 
     def createLogWithInterceptedReads(recoveryPoint: Long): UnifiedLog = {
       val maxTransactionTimeoutMs = 5 * 60 * 1000
-      val producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
+      val producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
       val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
       val logDirFailureChannel = new LogDirFailureChannel(10)
       // Intercept all segment read calls
@@ -512,7 +512,7 @@ class LogLoaderTest {
       firstAppendTimestamp, coordinatorEpoch = coordinatorEpoch)
     assertEquals(firstAppendTimestamp, 
log.producerStateManager.lastEntry(producerId).get.lastTimestamp)
 
-    val maxProducerIdExpirationMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT
+    val maxProducerIdExpirationMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT
     mockTime.sleep(maxProducerIdExpirationMs)
     assertEquals(Optional.empty(), 
log.producerStateManager.lastEntry(producerId))
 
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 9e96ec16bdc..2d5d6a849b8 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -28,7 +28,7 @@ import 
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrTopic
 import org.apache.kafka.common.requests.{AbstractControlRequest, 
LeaderAndIsrRequest}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{DirectoryId, KafkaException, TopicIdPartition, 
TopicPartition, Uuid}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.image.{TopicImage, TopicsImage}
 import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
@@ -829,7 +829,7 @@ class LogManagerTest {
     val segmentBytes = 1024
 
     val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, 
time.scheduler, time, 0, 0,
-      5 * 60 * 1000, new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false), TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT)
+      5 * 60 * 1000, new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false), TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT)
 
     assertTrue(expectedSegmentsPerLog > 0)
     // calculate numMessages to append to logs. It'll create 
"expectedSegmentsPerLog" log segments with segment.bytes=1024
@@ -965,7 +965,7 @@ class LogManagerTest {
         recoveryPoint = 0,
         maxTransactionTimeoutMs = 5 * 60 * 1000,
         producerStateManagerConfig = new ProducerStateManagerConfig(5 * 60 * 
1000, false),
-        producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+        producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
         scheduler = mock(classOf[Scheduler]),
         time = mockTime,
         brokerTopicStats = mockBrokerTopicStats,
@@ -1375,8 +1375,8 @@ class LogManagerTest {
       flushStartOffsetCheckpointMs = 10000L,
       retentionCheckMs = 1000L,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
-      producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
-      producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+      producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+      producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
       scheduler = scheduler,
       time = Time.SYSTEM,
       brokerTopicStats = new BrokerTopicStats,
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index e2272941ab3..a715bcc1ba9 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -23,7 +23,7 @@ import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{MockTime, Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.util.MockScheduler
 import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
@@ -640,7 +640,7 @@ class LogSegmentTest {
       topicPartition,
       logDir,
       5 * 60 * 1000,
-      new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+      new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
       new MockTime()
     )
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala 
b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 28037861253..12562e4aac3 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -31,7 +31,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, 
assertFalse}
 import java.nio.file.Files
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
 import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.config.ServerLogConfigs
 import org.apache.kafka.server.util.Scheduler
 import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
@@ -99,8 +99,8 @@ object LogTestUtils {
                 logStartOffset: Long = 0L,
                 recoveryPoint: Long = 0L,
                 maxTransactionTimeoutMs: Int = 5 * 60 * 1000,
-                producerStateManagerConfig: ProducerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
-                producerIdExpirationCheckIntervalMs: Int = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
+                producerStateManagerConfig: ProducerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+                producerIdExpirationCheckIntervalMs: Int = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
                 lastShutdownClean: Boolean = true,
                 topicId: Option[Uuid] = None,
                 keepPartitionMetadataFile: Boolean = true,
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 3c5a8ab468b..1c617c20d36 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -32,7 +32,7 @@ import 
org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{ListOffsetsRequest, 
ListOffsetsResponse}
 import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
@@ -66,7 +66,7 @@ class UnifiedLogTest {
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
   val mockTime = new MockTime()
   var logsToClose: Seq[UnifiedLog] = Seq()
-  val producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false)
+  val producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false)
   def metricsKeySet = 
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
 
   @BeforeEach
@@ -4452,7 +4452,7 @@ class UnifiedLogTest {
                         time: Time = mockTime,
                         maxTransactionTimeoutMs: Int = 60 * 60 * 1000,
                         producerStateManagerConfig: ProducerStateManagerConfig 
= producerStateManagerConfig,
-                        producerIdExpirationCheckIntervalMs: Int = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+                        producerIdExpirationCheckIntervalMs: Int = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
                         lastShutdownClean: Boolean = true,
                         topicId: Option[Uuid] = None,
                         keepPartitionMetadataFile: Boolean = true,
diff --git 
a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index 9732b0c025d..dbd7febaaed 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -36,8 +36,8 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.utils.{SecurityUtils, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
 import org.apache.kafka.coordinator.group.{GroupCoordinator, 
GroupCoordinatorConfig}
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.config.ServerConfigs
 import org.apache.kafka.server.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
@@ -69,10 +69,10 @@ class AutoTopicCreationManagerTest {
     props.setProperty(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG, 
requestTimeout.toString)
 
     
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
 internalTopicPartitions.toString)
-    
props.setProperty(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 internalTopicPartitions.toString)
+    
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 internalTopicPartitions.toString)
 
     props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 
internalTopicReplicationFactor.toString)
-    
props.setProperty(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
internalTopicReplicationFactor.toString)
+    
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
internalTopicReplicationFactor.toString)
 
     config = KafkaConfig.fromProps(props)
     val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1))
@@ -358,7 +358,7 @@ class AutoTopicCreationManagerTest {
         case Topic.GROUP_METADATA_TOPIC_NAME => getNewTopic(topicName,
           numPartitions = 
config.groupCoordinatorConfig.offsetsTopicPartitions, replicationFactor = 
config.groupCoordinatorConfig.offsetsTopicReplicationFactor)
         case Topic.TRANSACTION_STATE_TOPIC_NAME => getNewTopic(topicName,
-          numPartitions = config.transactionTopicPartitions, replicationFactor 
= config.transactionTopicReplicationFactor)
+          numPartitions = 
config.transactionLogConfig.transactionTopicPartitions, replicationFactor = 
config.transactionLogConfig.transactionTopicReplicationFactor)
       }
     } else {
       getNewTopic(topicName)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 7bba1dc7e04..1b685e4265d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -76,7 +76,7 @@ import 
org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
 import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, 
ProducerIdAndEpoch, SecurityUtils, Utils}
 import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
 CONSUMER_SESSION_TIMEOUT_MS_CONFIG}
 import org.apache.kafka.coordinator.group.{GroupCoordinator, 
GroupCoordinatorConfig}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.metadata.LeaderAndIsr
 import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
 import org.apache.kafka.raft.QuorumConfig
@@ -1340,8 +1340,8 @@ class KafkaApisTest extends Logging {
             groupId, AuthorizationResult.ALLOWED)
           Topic.GROUP_METADATA_TOPIC_NAME
         case CoordinatorType.TRANSACTION =>
-          
topicConfigOverride.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
 numBrokersNeeded.toString)
-          
topicConfigOverride.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 numBrokersNeeded.toString)
+          
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
 numBrokersNeeded.toString)
+          
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 numBrokersNeeded.toString)
           when(txnCoordinator.transactionTopicConfigs).thenReturn(new 
Properties)
           authorizeResource(authorizer, AclOperation.DESCRIBE, 
ResourceType.TRANSACTIONAL_ID,
             groupId, AuthorizationResult.ALLOWED)
@@ -1459,8 +1459,8 @@ class KafkaApisTest extends Logging {
           true
 
         case Topic.TRANSACTION_STATE_TOPIC_NAME =>
-          
topicConfigOverride.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
 numBrokersNeeded.toString)
-          
topicConfigOverride.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 numBrokersNeeded.toString)
+          
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
 numBrokersNeeded.toString)
+          
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 numBrokersNeeded.toString)
           when(txnCoordinator.transactionTopicConfigs).thenReturn(new 
Properties)
           true
         case _ =>
@@ -8880,7 +8880,7 @@ class KafkaApisTest extends Logging {
     ).build()
 
     val requestChannelRequest = buildRequest(offsetCommitRequest)
-    
+
     metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_2_2_IV1)
     brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, 
None)
     kafkaApis = createKafkaApis(IBP_2_2_IV1)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index f120784870a..b056c19b3f1 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -34,7 +34,7 @@ import 
org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
 import org.apache.kafka.coordinator.group.Group.GroupType
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, 
TransactionStateManagerConfigs}
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionStateManagerConfig}
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.security.PasswordEncoderConfigs
@@ -982,13 +982,13 @@ class KafkaConfigTest {
         case GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
         case GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
         case GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
-        case 
TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
-        case TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
-        case TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
-        case TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
-        case TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
-        case TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
-        case 
TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
+        case 
TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
+        case TransactionStateManagerConfig.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
+        case TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
+        case TransactionLogConfig.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
+        case TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
+        case TransactionLogConfig.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
+        case TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
         case QuotaConfigs.QUOTA_WINDOW_SIZE_SECONDS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
         case ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_boolean", "0")
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 29c4c602c17..fa77e38c7c0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -51,7 +51,7 @@ import 
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Exit, LogContext, Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.image._
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
@@ -2549,9 +2549,9 @@ class ReplicaManagerTest {
       // Dynamically enable verification.
       config.dynamicConfig.initialize(None, None)
       val props = new Properties()
-      
props.put(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
 "true")
+      
props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
 "true")
       config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
-      TestUtils.waitUntilTrue(() => 
config.transactionPartitionVerificationEnable == true, "Config did not 
dynamically update.")
+      TestUtils.waitUntilTrue(() => 
config.transactionLogConfig.transactionPartitionVerificationEnable == true, 
"Config did not dynamically update.")
 
       // Try to append more records. We don't need to send a request since the 
transaction is already ongoing.
       val moreTransactionalRecords = 
MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, 
producerEpoch, sequence + 1,
@@ -2601,9 +2601,9 @@ class ReplicaManagerTest {
       // Disable verification
       config.dynamicConfig.initialize(None, None)
       val props = new Properties()
-      
props.put(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
 "false")
+      
props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
 "false")
       config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
-      TestUtils.waitUntilTrue(() => 
config.transactionPartitionVerificationEnable == false, "Config did not 
dynamically update.")
+      TestUtils.waitUntilTrue(() => 
config.transactionLogConfig.transactionPartitionVerificationEnable == false, 
"Config did not dynamically update.")
 
       // Confirm we did not write to the log and instead returned error.
       val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue()
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala 
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 04d42dadd75..f8f5eb8b62a 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -41,7 +41,7 @@ import org.apache.kafka.common.utils.{Exit, Utils}
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord
 import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
 import 
org.apache.kafka.coordinator.group.generated.{ConsumerGroupMemberMetadataValue, 
ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, GroupMetadataKey, 
GroupMetadataValue}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch, VoterSetTest}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion}
@@ -93,8 +93,8 @@ class DumpLogSegmentsTest {
       time = time,
       brokerTopicStats = new BrokerTopicStats,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
-      producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
-      producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+      producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+      producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala 
b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index 917f53eed60..d9004824080 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic._
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import kafka.log.{LocalLog, LogLoader, UnifiedLog}
 import kafka.utils.TestUtils.retry
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
 import org.apache.kafka.storage.internals.log.{LogConfig, 
LogDirFailureChannel, LogSegments, ProducerStateManager, 
ProducerStateManagerConfig}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -134,8 +134,8 @@ class SchedulerTest {
     val logConfig = new LogConfig(new Properties())
     val brokerTopicStats = new BrokerTopicStats
     val maxTransactionTimeoutMs = 5 * 60 * 1000
-    val maxProducerIdExpirationMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT
-    val producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
+    val maxProducerIdExpirationMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT
+    val producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
     val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
     val logDirFailureChannel = new LogDirFailureChannel(10)
     val segments = new LogSegments(topicPartition)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index d3bd7a59d6b..c3a19864321 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -51,7 +51,7 @@ import org.apache.kafka.common.serialization._
 import org.apache.kafka.common.utils.Utils.formatAddress
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.metadata.LeaderAndIsr
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.network.metrics.RequestChannelMetrics
@@ -1195,8 +1195,8 @@ object TestUtils extends Logging {
                    flushStartOffsetCheckpointMs = 10000L,
                    retentionCheckMs = 1000L,
                    maxTransactionTimeoutMs = 5 * 60 * 1000,
-                   producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 transactionVerificationEnabled),
-                   producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+                   producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 transactionVerificationEnabled),
+                   producerIdExpirationCheckIntervalMs = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
                    scheduler = time.scheduler,
                    time = time,
                    brokerTopicStats = new BrokerTopicStats,
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index ea7f3b78ed4..b8479642e72 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs;
-import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
 import org.apache.kafka.network.SocketServerConfigs;
 import org.apache.kafka.raft.QuorumConfig;
 import org.apache.kafka.security.PasswordEncoderConfigs;
@@ -57,8 +57,8 @@ public abstract class AbstractKafkaConfig extends 
AbstractConfig {
             CleanerConfig.CONFIG_DEF,
             LogConfig.SERVER_CONFIG_DEF,
             ShareGroupConfig.CONFIG_DEF,
-            TransactionLogConfigs.CONFIG_DEF,
-            TransactionStateManagerConfigs.CONFIG_DEF,
+            TransactionLogConfig.CONFIG_DEF,
+            TransactionStateManagerConfig.CONFIG_DEF,
             QuorumConfig.CONFIG_DEF,
             MetricConfigs.CONFIG_DEF,
             QuotaConfigs.CONFIG_DEF,
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
index 9f5eee05c99..acff9bc6f29 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
@@ -59,7 +59,7 @@ import static java.util.Arrays.asList;
 import static java.util.Collections.emptySet;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
-import static 
org.apache.kafka.coordinator.transaction.TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT;
+import static 
org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT;
 import static 
org.apache.kafka.storage.internals.log.ProducerStateManager.LATE_TRANSACTION_BUFFER_MS;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index dc82464107b..f4f03b98330 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -23,7 +23,7 @@ import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
 import org.apache.kafka.network.SocketServerConfigs;
 import org.apache.kafka.server.config.ConfigType;
 import org.apache.kafka.server.config.ServerConfigs;
@@ -124,7 +124,7 @@ public class EmbeddedKafkaCluster {
         putIfAbsent(brokerConfig, 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0);
         putIfAbsent(brokerConfig, 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) 1);
         putIfAbsent(brokerConfig, 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 5);
-        putIfAbsent(brokerConfig, 
TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 5);
+        putIfAbsent(brokerConfig, 
TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 5);
         putIfAbsent(brokerConfig, 
ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, true);
 
         for (int i = 0; i < brokers.length; i++) {
diff --git 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigs.java
 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java
similarity index 59%
rename from 
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigs.java
rename to 
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java
index b80a87c6e86..c8f236f4d6a 100644
--- 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigs.java
+++ 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.coordinator.transaction;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 
 import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
@@ -25,7 +26,7 @@ import static 
org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
 import static org.apache.kafka.common.config.ConfigDef.Type.INT;
 import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
 
-public final class TransactionLogConfigs {
+public final class TransactionLogConfig {
     // Log-level config and default values
     public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = 
"transaction.state.log.num.partitions";
     public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50;
@@ -61,15 +62,67 @@ public final class TransactionLogConfigs {
     public static final int PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT = 
600000;
     public static final String PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC = 
"The interval at which to remove producer IDs that have expired due to 
<code>producer.id.expiration.ms</code> passing.";
     public static final ConfigDef CONFIG_DEF =  new ConfigDef()
-            .define(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 
INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, atLeast(1), 
HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DOC)
-            
.define(TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG, INT, 
TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, 
TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DOC)
-            
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 
SHORT, TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT, 
atLeast(1), HIGH, 
TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC)
-            
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, INT, 
TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, 
TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DOC)
-            
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG, INT, 
TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), 
HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC)
+            .define(TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, INT, 
TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, atLeast(1), HIGH, 
TRANSACTIONS_TOPIC_MIN_ISR_DOC)
+            .define(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG, INT, 
TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, 
TRANSACTIONS_LOAD_BUFFER_SIZE_DOC)
+            .define(TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, 
TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, 
TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC)
+            .define(TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, INT, 
TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, 
TRANSACTIONS_TOPIC_PARTITIONS_DOC)
+            .define(TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG, INT, 
TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, 
TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC)
 
-            
.define(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, 
BOOLEAN, 
TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT, LOW, 
TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC)
+            .define(TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, BOOLEAN, 
TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT, LOW, 
TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC)
 
-            .define(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, 
INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, atLeast(1), LOW, 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DOC)
+            .define(PRODUCER_ID_EXPIRATION_MS_CONFIG, INT, 
PRODUCER_ID_EXPIRATION_MS_DEFAULT, atLeast(1), LOW, 
PRODUCER_ID_EXPIRATION_MS_DOC)
             // Configuration for testing only as default value should be 
sufficient for typical usage
-            
.defineInternal(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG,
 INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, 
atLeast(1), LOW, 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC);
+            .defineInternal(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, 
INT, PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW, 
PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC);
+
+    private final AbstractConfig config;
+    private final int transactionTopicMinISR;
+    private final int transactionsLoadBufferSize;
+    private final short transactionTopicReplicationFactor;
+    private final int transactionTopicPartitions;
+    private final int transactionTopicSegmentBytes;
+    private final int producerIdExpirationCheckIntervalMs;
+
+    public TransactionLogConfig(AbstractConfig config) {
+        this.config = config;
+        this.transactionTopicMinISR = 
config.getInt(TRANSACTIONS_TOPIC_MIN_ISR_CONFIG);
+        this.transactionsLoadBufferSize = 
config.getInt(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG);
+        this.transactionTopicReplicationFactor = 
config.getShort(TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG);
+        this.transactionTopicPartitions = 
config.getInt(TRANSACTIONS_TOPIC_PARTITIONS_CONFIG);
+        this.transactionTopicSegmentBytes = 
config.getInt(TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG);
+        this.producerIdExpirationCheckIntervalMs = 
config.getInt(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG);
+    }
+
+    public int transactionTopicMinISR() {
+        return transactionTopicMinISR;
+    }
+
+    public int transactionsLoadBufferSize() {
+        return transactionsLoadBufferSize;
+    }
+
+    public short transactionTopicReplicationFactor() {
+        return transactionTopicReplicationFactor;
+    }
+
+    public int transactionTopicPartitions() {
+        return transactionTopicPartitions;
+    }
+
+    public int transactionTopicSegmentBytes() {
+        return transactionTopicSegmentBytes;
+    }
+
+    public int producerIdExpirationCheckIntervalMs() {
+        return producerIdExpirationCheckIntervalMs;
+    }
+
+    // This is a broker dynamic config used for 
DynamicProducerStateManagerConfig
+    public Boolean transactionPartitionVerificationEnable() {
+        return 
config.getBoolean(TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
+    }
+
+    // This is a broker dynamic config used for 
DynamicProducerStateManagerConfig
+    public int producerIdExpirationMs() {
+        return config.getInt(PRODUCER_ID_EXPIRATION_MS_CONFIG);
+    }
 }
diff --git 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfigs.java
 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfig.java
similarity index 61%
rename from 
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfigs.java
rename to 
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfig.java
index 4da091afc52..46dfb46d129 100644
--- 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfigs.java
+++ 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfig.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.coordinator.transaction;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 
 import java.util.concurrent.TimeUnit;
@@ -25,7 +26,7 @@ import static 
org.apache.kafka.common.config.ConfigDef.Importance.LOW;
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Type.INT;
 
-public final class TransactionStateManagerConfigs {
+public final class TransactionStateManagerConfig {
     // Transaction management configs and default values
     public static final String TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG = 
"transaction.max.timeout.ms";
     public static final int TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT = (int) 
TimeUnit.MINUTES.toMillis(15);
@@ -49,9 +50,35 @@ public final class TransactionStateManagerConfigs {
     public static final String METRICS_GROUP = 
"transaction-coordinator-metrics";
     public static final String LOAD_TIME_SENSOR = 
"TransactionsPartitionLoadTime";
     public static final ConfigDef CONFIG_DEF =  new ConfigDef()
-            
.define(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, 
INT, TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT, 
atLeast(1), HIGH, 
TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DOC)
-            
.define(TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG, INT, 
TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT, atLeast(1), 
HIGH, TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DOC)
-            
.define(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
 INT, 
TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT,
 atLeast(1), LOW, 
TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTIONS_INTERVAL_MS_DOC)
-            
.define(TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG,
 INT, 
TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT,
 atLeast(1), LOW, 
TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONS_INTERVAL_MS_DOC);
+            .define(TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, INT, 
TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT, atLeast(1), HIGH, 
TRANSACTIONAL_ID_EXPIRATION_MS_DOC)
+            .define(TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG, INT, 
TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, 
TRANSACTIONS_MAX_TIMEOUT_MS_DOC)
+            
.define(TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, 
INT, TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT, 
atLeast(1), LOW, TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTIONS_INTERVAL_MS_DOC)
+            
.define(TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG,
 INT, TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT, 
atLeast(1), LOW, TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONS_INTERVAL_MS_DOC);
 
+    private final int transactionalIdExpirationMs;
+    private final int transactionMaxTimeoutMs;
+    private final int transactionAbortTimedOutTransactionCleanupIntervalMs;
+    private final int transactionRemoveExpiredTransactionalIdCleanupIntervalMs;
+
+    public TransactionStateManagerConfig(AbstractConfig config) {
+        transactionalIdExpirationMs = 
config.getInt(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG);
+        transactionMaxTimeoutMs = 
config.getInt(TransactionStateManagerConfig.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG);
+        transactionAbortTimedOutTransactionCleanupIntervalMs = 
config.getInt(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG);
+        transactionRemoveExpiredTransactionalIdCleanupIntervalMs = 
config.getInt(TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG);
+    }
+    public int transactionalIdExpirationMs() {
+        return transactionalIdExpirationMs;
+    }
+
+    public int transactionMaxTimeoutMs() {
+        return transactionMaxTimeoutMs;
+    }
+
+    public int transactionAbortTimedOutTransactionCleanupIntervalMs() {
+        return transactionAbortTimedOutTransactionCleanupIntervalMs;
+    }
+
+    public int transactionRemoveExpiredTransactionalIdCleanupIntervalMs() {
+        return transactionRemoveExpiredTransactionalIdCleanupIntervalMs;
+    }
 }

Reply via email to