This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6d68f8a82c9 MINOR: Move BrokerReconfigurable to the sever-common 
module (#19383)
6d68f8a82c9 is described below

commit 6d68f8a82c9c4a9d69a19ee40143f7c7b253dfe6
Author: TengYao Chi <kiting...@gmail.com>
AuthorDate: Mon Apr 7 07:39:01 2025 +0800

    MINOR: Move BrokerReconfigurable to the sever-common module (#19383)
    
    This patch moves `BrokerReconfigurable` to the `server-common module`
    and decouples the `TransactionLogConfig` and `KafkaConfig` to unblock
    KAFKA-14485.
    
    Reviewers: PoAn Yang <pay...@apache.org>, TaiJuWu <tjwu1...@gmail.com>,
    Chia-Ping Tsai <chia7...@gmail.com>
---
 checkstyle/import-control-server-common.xml               |  2 +-
 checkstyle/import-control-server.xml                      |  1 +
 .../coordinator/transaction/TransactionCoordinator.scala  | 13 +++++++------
 core/src/main/scala/kafka/log/LogManager.scala            |  6 ++++--
 .../scala/kafka/server/AutoTopicCreationManager.scala     |  6 ++++--
 .../src/main/scala/kafka/server/DynamicBrokerConfig.scala |  7 ++++---
 core/src/main/scala/kafka/server/KafkaConfig.scala        |  3 +--
 core/src/main/scala/kafka/server/ReplicaManager.scala     |  8 ++++++--
 .../kafka/server/metadata/BrokerMetadataPublisher.scala   |  4 +++-
 .../test/scala/unit/kafka/server/ReplicaManagerTest.scala |  6 ++++--
 .../org/apache/kafka}/config/BrokerReconfigurable.java    | 15 +++++++--------
 .../apache/kafka/server/config/AbstractKafkaConfig.java   |  2 --
 .../server/config/DynamicProducerStateManagerConfig.java  | 10 ++++++----
 13 files changed, 48 insertions(+), 35 deletions(-)

diff --git a/checkstyle/import-control-server-common.xml 
b/checkstyle/import-control-server-common.xml
index 20f6e32fc9a..2b5c9002c9b 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -38,7 +38,7 @@
     <disallow pkg="kafka" />
 
     <!-- anyone can use public classes -->
-    <allow pkg="org.apache.kafka.common" exact-match="true" />
+    <allow pkg="org.apache.kafka.common" />
     <allow pkg="org.apache.kafka.common.security" />
     <allow pkg="org.apache.kafka.common.serialization" />
     <allow pkg="org.apache.kafka.common.utils" />
diff --git a/checkstyle/import-control-server.xml 
b/checkstyle/import-control-server.xml
index 05e67a153bd..b45b3c41c27 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -59,6 +59,7 @@
   <allow pkg="org.apache.kafka.metadata" />
 
   <!-- utilities and reusable classes from server-common -->
+  <allow pkg="org.apache.kafka.config"/>
   <allow pkg="org.apache.kafka.queue" />
   <allow pkg="org.apache.kafka.security" />
   <allow pkg="org.apache.kafka.server.common" />
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 697d6a30144..ed8140e596f 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse, 
TransactionResult}
 import org.apache.kafka.common.utils.{LogContext, ProducerIdAndEpoch, Time}
-import org.apache.kafka.coordinator.transaction.ProducerIdManager
+import org.apache.kafka.coordinator.transaction.{ProducerIdManager, 
TransactionLogConfig}
 import org.apache.kafka.metadata.MetadataCache
 import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
 import org.apache.kafka.server.util.Scheduler
@@ -46,13 +46,14 @@ object TransactionCoordinator {
             metadataCache: MetadataCache,
             time: Time): TransactionCoordinator = {
 
+    val transactionLogConfig = new TransactionLogConfig(config)
     val txnConfig = 
TransactionConfig(config.transactionStateManagerConfig.transactionalIdExpirationMs,
       config.transactionStateManagerConfig.transactionMaxTimeoutMs,
-      config.transactionLogConfig.transactionTopicPartitions,
-      config.transactionLogConfig.transactionTopicReplicationFactor,
-      config.transactionLogConfig.transactionTopicSegmentBytes,
-      config.transactionLogConfig.transactionLoadBufferSize,
-      config.transactionLogConfig.transactionTopicMinISR,
+      transactionLogConfig.transactionTopicPartitions,
+      transactionLogConfig.transactionTopicReplicationFactor,
+      transactionLogConfig.transactionTopicSegmentBytes,
+      transactionLogConfig.transactionLoadBufferSize,
+      transactionLogConfig.transactionTopicMinISR,
       
config.transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs,
       
config.transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
       config.transactionStateManagerConfig.transaction2PCEnabled,
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 2bafe735283..4b255e9a66e 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -29,6 +29,7 @@ import kafka.utils.{CoreUtils, Logging, Pool}
 import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, 
Uuid}
 import org.apache.kafka.common.utils.{Exit, KafkaThread, Time, Utils}
 import org.apache.kafka.common.errors.{InconsistentTopicIdException, 
KafkaStorageException, LogDirNotFoundException}
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 
 import scala.jdk.CollectionConverters._
 import scala.collection._
@@ -1548,6 +1549,7 @@ object LogManager {
     val defaultLogConfig = new LogConfig(defaultProps)
 
     val cleanerConfig = LogCleaner.cleanerConfig(config)
+    val transactionLogConfig = new TransactionLogConfig(config)
 
     new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
       initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),
@@ -1560,8 +1562,8 @@ object LogManager {
       flushStartOffsetCheckpointMs = 
config.logFlushStartOffsetCheckpointIntervalMs,
       retentionCheckMs = config.logCleanupIntervalMs,
       maxTransactionTimeoutMs = 
config.transactionStateManagerConfig.transactionMaxTimeoutMs,
-      producerStateManagerConfig = new 
ProducerStateManagerConfig(config.transactionLogConfig.producerIdExpirationMs, 
config.transactionLogConfig.transactionPartitionVerificationEnable),
-      producerIdExpirationCheckIntervalMs = 
config.transactionLogConfig.producerIdExpirationCheckIntervalMs,
+      producerStateManagerConfig = new 
ProducerStateManagerConfig(transactionLogConfig.producerIdExpirationMs, 
transactionLogConfig.transactionPartitionVerificationEnable),
+      producerIdExpirationCheckIntervalMs = 
transactionLogConfig.producerIdExpirationCheckIntervalMs,
       scheduler = kafkaScheduler,
       brokerTopicStats = brokerTopicStats,
       logDirFailureChannel = logDirFailureChannel,
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala 
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index 85308473bf6..a2c2bd4d80b 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{CreateTopicsRequest, RequestContext, 
RequestHeader}
 import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.coordinator.share.ShareCoordinator
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 
 import scala.collection.{Map, Seq, Set, mutable}
@@ -189,10 +190,11 @@ class DefaultAutoTopicCreationManager(
           
.setReplicationFactor(config.groupCoordinatorConfig.offsetsTopicReplicationFactor)
           
.setConfigs(convertToTopicConfigCollections(groupCoordinator.groupMetadataTopicConfigs))
       case TRANSACTION_STATE_TOPIC_NAME =>
+        val transactionLogConfig = new TransactionLogConfig(config)
         new CreatableTopic()
           .setName(topic)
-          
.setNumPartitions(config.transactionLogConfig.transactionTopicPartitions)
-          
.setReplicationFactor(config.transactionLogConfig.transactionTopicReplicationFactor)
+          .setNumPartitions(transactionLogConfig.transactionTopicPartitions)
+          
.setReplicationFactor(transactionLogConfig.transactionTopicReplicationFactor)
           .setConfigs(convertToTopicConfigCollections(
             txnCoordinator.transactionTopicConfigs))
       case SHARE_GROUP_STATE_TOPIC_NAME =>
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 29a3971d37f..6c4066c9db2 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -35,10 +35,11 @@ import org.apache.kafka.common.metrics.{Metrics, 
MetricsReporter}
 import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
 import org.apache.kafka.common.security.authenticator.LoginManager
 import org.apache.kafka.common.utils.{BufferSupplier, ConfigUtils, Utils}
+import org.apache.kafka.config
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.KafkaRaftClient
-import org.apache.kafka.server.{ProcessRole, DynamicThreadPool}
+import org.apache.kafka.server.{DynamicThreadPool, ProcessRole}
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, 
ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@@ -323,7 +324,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     reconfigurables.add(reconfigurable)
   }
 
-  def addBrokerReconfigurable(reconfigurable: 
org.apache.kafka.server.config.BrokerReconfigurable): Unit = {
+  def addBrokerReconfigurable(reconfigurable: config.BrokerReconfigurable): 
Unit = {
     verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
     brokerReconfigurables.add(new BrokerReconfigurable {
       override def reconfigurableConfigs: Set[String] = 
reconfigurable.reconfigurableConfigs().asScala
@@ -617,7 +618,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
 }
 
 /**
- * Implement [[org.apache.kafka.server.config.BrokerReconfigurable]] instead.
+ * Implement [[config.BrokerReconfigurable]] instead.
  */
 trait BrokerReconfigurable {
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2b8be8518b5..87bfd9cd21f 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.coordinator.group.Group.GroupType
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, 
TransactionLogConfig, TransactionStateManagerConfig}
+import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, 
TransactionStateManagerConfig}
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.security.authorizer.AuthorizerUtils
@@ -204,7 +204,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   private val _shareCoordinatorConfig = new ShareCoordinatorConfig(this)
   def shareCoordinatorConfig: ShareCoordinatorConfig = _shareCoordinatorConfig
 
-  override val transactionLogConfig = new TransactionLogConfig(this)
   private val _transactionStateManagerConfig = new 
TransactionStateManagerConfig(this)
   private val _addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(this)
   def transactionStateManagerConfig: TransactionStateManagerConfig = 
_transactionStateManagerConfig
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9b5c01a85c9..aa08fc93fff 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -47,6 +47,7 @@ import 
org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.{Exit, Time, Utils}
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.metadata.MetadataCache
@@ -1038,10 +1039,13 @@ class ReplicaManager(val config: KafkaConfig,
     callback: ((Map[TopicPartition, Errors], Map[TopicPartition, 
VerificationGuard])) => Unit,
     transactionSupportedOperation: TransactionSupportedOperation
   ): Unit = {
+    def transactionPartitionVerificationEnable = {
+      new TransactionLogConfig(config).transactionPartitionVerificationEnable
+    }
     // Skip verification if the request is not transactional or transaction 
verification is disabled.
-    if (transactionalId == null ||
-      (!config.transactionLogConfig.transactionPartitionVerificationEnable && 
!transactionSupportedOperation.supportsEpochBump)
+    if (transactionalId == null
       || addPartitionsToTxnManager.isEmpty
+      || (!transactionSupportedOperation.supportsEpochBump && 
!transactionPartitionVerificationEnable)
     ) {
       callback((Map.empty[TopicPartition, Errors], Map.empty[TopicPartition, 
VerificationGuard]))
       return
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index ead385e7447..de8f16e1e58 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.coordinator.share.ShareCoordinator
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.image.loader.LoaderManifest
 import org.apache.kafka.image.publisher.MetadataPublisher
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
@@ -331,9 +332,10 @@ class BrokerMetadataPublisher(
       case t: Throwable => fatalFaultHandler.handleFault("Error starting 
GroupCoordinator", t)
     }
     try {
+      val transactionLogConfig = new TransactionLogConfig(config)
       // Start the transaction coordinator.
       txnCoordinator.startup(() => metadataCache.numPartitions(
-        
Topic.TRANSACTION_STATE_TOPIC_NAME).orElse(config.transactionLogConfig.transactionTopicPartitions))
+        
Topic.TRANSACTION_STATE_TOPIC_NAME).orElse(transactionLogConfig.transactionTopicPartitions))
     } catch {
       case t: Throwable => fatalFaultHandler.handleFault("Error starting 
TransactionCoordinator", t)
     }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 5ae1262df40..a8e760c7b7a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2523,7 +2523,8 @@ class ReplicaManagerTest {
       val props = new Properties()
       
props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
 "true")
       config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
-      TestUtils.waitUntilTrue(() => 
config.transactionLogConfig.transactionPartitionVerificationEnable, "Config did 
not dynamically update.")
+      val transactionLogConfig = new TransactionLogConfig(config)
+      TestUtils.waitUntilTrue(() => 
transactionLogConfig.transactionPartitionVerificationEnable, "Config did not 
dynamically update.")
 
       // Try to append more records. We don't need to send a request since the 
transaction is already ongoing.
       val moreTransactionalRecords = 
MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, 
producerEpoch, sequence + 1,
@@ -2575,7 +2576,8 @@ class ReplicaManagerTest {
       val props = new Properties()
       
props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
 "false")
       config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
-      TestUtils.waitUntilTrue(() => 
!config.transactionLogConfig.transactionPartitionVerificationEnable, "Config 
did not dynamically update.")
+      val transactionLogConfig = new TransactionLogConfig(config)
+      TestUtils.waitUntilTrue(() => 
!transactionLogConfig.transactionPartitionVerificationEnable, "Config did not 
dynamically update.")
 
       // Confirm we did not write to the log and instead returned error.
       val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java 
b/server-common/src/main/java/org/apache/kafka/config/BrokerReconfigurable.java
similarity index 88%
rename from 
server/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java
rename to 
server-common/src/main/java/org/apache/kafka/config/BrokerReconfigurable.java
index 2b3fe9dcf34..b7076f05ef3 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java
+++ 
b/server-common/src/main/java/org/apache/kafka/config/BrokerReconfigurable.java
@@ -14,7 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.server.config;
+package org.apache.kafka.config;
+
+import org.apache.kafka.common.config.AbstractConfig;
 
 import java.util.Set;
 
@@ -27,13 +29,10 @@ import java.util.Set;
  * The reconfiguration process follows three steps:
  * <ol>
  *   <li>Determining which configurations can be dynamically updated via 
{@link #reconfigurableConfigs()}</li>
- *   <li>Validating the new configuration before applying it via {@link 
#validateReconfiguration(AbstractKafkaConfig)}</li>
- *   <li>Applying the new configuration via {@link 
#reconfigure(AbstractKafkaConfig, AbstractKafkaConfig)}</li>
+ *   <li>Validating the new configuration before applying it via {@link 
#validateReconfiguration(AbstractConfig)}</li>
+ *   <li>Applying the new configuration via {@link 
#reconfigure(AbstractConfig, AbstractConfig)}</li>
  * </ol>
  * <strong>Note: Since Kafka is eliminating Scala, developers should implement 
this interface instead of {@link kafka.server.BrokerReconfigurable}</strong>
- *
- *
- * @see AbstractKafkaConfig
  */
 public interface BrokerReconfigurable {
     /**
@@ -55,7 +54,7 @@ public interface BrokerReconfigurable {
      *
      * @param newConfig the new configuration to validate
      */
-    void validateReconfiguration(AbstractKafkaConfig newConfig);
+    void validateReconfiguration(AbstractConfig newConfig);
 
     /**
      * Applies the new configuration.
@@ -65,5 +64,5 @@ public interface BrokerReconfigurable {
      * @param oldConfig the previous configuration
      * @param newConfig the new configuration to apply
      */
-    void reconfigure(AbstractKafkaConfig oldConfig, AbstractKafkaConfig 
newConfig);
+    void reconfigure(AbstractConfig oldConfig, AbstractConfig newConfig);
 }
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index fc6906b96d3..87bf18a412f 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -83,6 +83,4 @@ public abstract class AbstractKafkaConfig extends 
AbstractConfig {
     public int backgroundThreads() {
         return getInt(ServerConfigs.BACKGROUND_THREADS_CONFIG);
     }
-
-    public abstract TransactionLogConfig transactionLogConfig();
 }
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
 
b/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
index 4158194f000..de9289e09a7 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.server.config;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.config.BrokerReconfigurable;
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
 import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
 
@@ -39,16 +41,16 @@ public class DynamicProducerStateManagerConfig implements 
BrokerReconfigurable {
     }
 
     @Override
-    public void validateReconfiguration(AbstractKafkaConfig newConfig) {
-        TransactionLogConfig transactionLogConfig = 
newConfig.transactionLogConfig();
+    public void validateReconfiguration(AbstractConfig newConfig) {
+        TransactionLogConfig transactionLogConfig = new 
TransactionLogConfig(newConfig);
         if (transactionLogConfig.producerIdExpirationMs() < 0)
             throw new 
ConfigException(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG + "cannot 
be less than 0, current value is " +
                                       
producerStateManagerConfig.producerIdExpirationMs() + ", and new value is " + 
transactionLogConfig.producerIdExpirationMs());
     }
 
     @Override
-    public void reconfigure(AbstractKafkaConfig oldConfig, AbstractKafkaConfig 
newConfig) {
-        TransactionLogConfig transactionLogConfig = 
newConfig.transactionLogConfig();
+    public void reconfigure(AbstractConfig oldConfig, AbstractConfig 
newConfig) {
+        TransactionLogConfig transactionLogConfig = new 
TransactionLogConfig(newConfig);
         if (producerStateManagerConfig.producerIdExpirationMs() != 
transactionLogConfig.producerIdExpirationMs()) {
             log.info("Reconfigure {} from {} to {}",
                 TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG,

Reply via email to