This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f59d829381a KAFKA-15853 Move TransactionLogConfig and
TransactionStateManagerConfig getters out of KafkaConfig (#16665)
f59d829381a is described below
commit f59d829381a7021971b0b2835fbbacb4abd996a5
Author: Omnia Ibrahim <[email protected]>
AuthorDate: Tue Sep 3 11:24:12 2024 +0100
KAFKA-15853 Move TransactionLogConfig and TransactionStateManagerConfig
getters out of KafkaConfig (#16665)
Reviewers: Chia-Ping Tsai <[email protected]>
---
checkstyle/import-control.xml | 2 +-
.../transaction/TransactionCoordinator.scala | 18 +++---
.../transaction/TransactionStateManager.scala | 26 ++++----
core/src/main/scala/kafka/log/LogManager.scala | 6 +-
.../kafka/server/AutoTopicCreationManager.scala | 4 +-
.../scala/kafka/server/DynamicBrokerConfig.scala | 20 +++---
core/src/main/scala/kafka/server/KafkaConfig.scala | 23 ++-----
core/src/main/scala/kafka/server/KafkaServer.scala | 2 +-
.../main/scala/kafka/server/ReplicaManager.scala | 2 +-
.../server/metadata/BrokerMetadataPublisher.scala | 2 +-
.../admin/AdminFenceProducersIntegrationTest.scala | 10 +--
.../api/AbstractAuthorizerIntegrationTest.scala | 8 +--
.../kafka/api/GroupAuthorizerIntegrationTest.scala | 8 +--
.../kafka/api/ProducerIdExpirationTest.scala | 20 +++---
.../SaslClientsWithInvalidCredentialsTest.scala | 6 +-
.../kafka/api/TransactionsBounceTest.scala | 6 +-
.../kafka/api/TransactionsExpirationTest.scala | 18 +++---
.../integration/kafka/api/TransactionsTest.scala | 10 +--
.../api/TransactionsWithMaxInFlightOneTest.scala | 10 +--
.../server/DynamicBrokerReconfigurationTest.scala | 6 +-
.../src/test/scala/other/kafka/StressTestLog.scala | 6 +-
.../scala/other/kafka/TestLinearWriteSpeed.scala | 6 +-
.../unit/kafka/cluster/PartitionLockTest.scala | 4 +-
.../scala/unit/kafka/cluster/PartitionTest.scala | 4 +-
.../transaction/TransactionCoordinatorTest.scala | 12 ++--
.../log/AbstractLogCleanerIntegrationTest.scala | 6 +-
.../unit/kafka/log/BrokerCompressionTest.scala | 6 +-
.../unit/kafka/log/LogCleanerManagerTest.scala | 10 +--
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 8 +--
.../scala/unit/kafka/log/LogConcurrencyTest.scala | 6 +-
.../test/scala/unit/kafka/log/LogLoaderTest.scala | 12 ++--
.../test/scala/unit/kafka/log/LogManagerTest.scala | 10 +--
.../test/scala/unit/kafka/log/LogSegmentTest.scala | 4 +-
.../test/scala/unit/kafka/log/LogTestUtils.scala | 6 +-
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 6 +-
.../server/AutoTopicCreationManagerTest.scala | 8 +--
.../scala/unit/kafka/server/KafkaApisTest.scala | 12 ++--
.../scala/unit/kafka/server/KafkaConfigTest.scala | 16 ++---
.../unit/kafka/server/ReplicaManagerTest.scala | 10 +--
.../unit/kafka/tools/DumpLogSegmentsTest.scala | 6 +-
.../scala/unit/kafka/utils/SchedulerTest.scala | 6 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 6 +-
.../kafka/server/config/AbstractKafkaConfig.java | 8 +--
.../internals/log/ProducerStateManagerTest.java | 2 +-
.../integration/utils/EmbeddedKafkaCluster.java | 4 +-
...onLogConfigs.java => TransactionLogConfig.java} | 71 +++++++++++++++++++---
...igs.java => TransactionStateManagerConfig.java} | 37 +++++++++--
47 files changed, 284 insertions(+), 215 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 285163816f2..f7d7ef798bd 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -406,7 +406,7 @@
<allow pkg="org.apache.kafka.tools" />
<allow pkg="org.apache.kafka.server.config" />
<allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
- <allow
class="org.apache.kafka.coordinator.transaction.TransactionLogConfigs" />
+ <allow
class="org.apache.kafka.coordinator.transaction.TransactionLogConfig" />
<allow pkg="org.apache.kafka.coordinator.group" />
<allow pkg="org.apache.kafka.network" />
</subpackage>
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index b3d5d9ef480..586ba1b4f98 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -44,15 +44,15 @@ object TransactionCoordinator {
metadataCache: MetadataCache,
time: Time): TransactionCoordinator = {
- val txnConfig = TransactionConfig(config.transactionalIdExpirationMs,
- config.transactionMaxTimeoutMs,
- config.transactionTopicPartitions,
- config.transactionTopicReplicationFactor,
- config.transactionTopicSegmentBytes,
- config.transactionsLoadBufferSize,
- config.transactionTopicMinISR,
- config.transactionAbortTimedOutTransactionCleanupIntervalMs,
- config.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
+ val txnConfig =
TransactionConfig(config.transactionStateManagerConfig.transactionalIdExpirationMs,
+ config.transactionStateManagerConfig.transactionMaxTimeoutMs,
+ config.transactionLogConfig.transactionTopicPartitions,
+ config.transactionLogConfig.transactionTopicReplicationFactor,
+ config.transactionLogConfig.transactionTopicSegmentBytes,
+ config.transactionLogConfig.transactionsLoadBufferSize,
+ config.transactionLogConfig.transactionTopicMinISR,
+
config.transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs,
+
config.transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
config.requestTimeoutMs)
val txnStateManager = new TransactionStateManager(config.brokerId,
scheduler, replicaManager, metadataCache, txnConfig,
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 2894bbdd94b..3daf966d8c3 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -35,7 +35,7 @@ import
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs,
TransactionStateManagerConfigs}
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfig,
TransactionStateManagerConfig}
import org.apache.kafka.server.common.TransactionVersion
import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.server.record.BrokerCompressionType
@@ -92,13 +92,13 @@ class TransactionStateManager(brokerId: Int,
@volatile private var transactionTopicPartitionCount: Int = _
/** setup metrics*/
- private val partitionLoadSensor =
metrics.sensor(TransactionStateManagerConfigs.LOAD_TIME_SENSOR)
+ private val partitionLoadSensor =
metrics.sensor(TransactionStateManagerConfig.LOAD_TIME_SENSOR)
partitionLoadSensor.add(metrics.metricName("partition-load-time-max",
- TransactionStateManagerConfigs.METRICS_GROUP,
+ TransactionStateManagerConfig.METRICS_GROUP,
"The max time it took to load the partitions in the last 30sec"), new
Max())
partitionLoadSensor.add(metrics.metricName("partition-load-time-avg",
- TransactionStateManagerConfigs.METRICS_GROUP,
+ TransactionStateManagerConfig.METRICS_GROUP,
"The avg time it took to load the partitions in the last 30sec"), new
Avg())
private[transaction] def usesFlexibleRecords(): Boolean = {
@@ -809,15 +809,15 @@ private[transaction] case class
TxnMetadataCacheEntry(coordinatorEpoch: Int,
private[transaction] case class
CoordinatorEpochAndTxnMetadata(coordinatorEpoch: Int,
transactionMetadata: TransactionMetadata)
-private[transaction] case class TransactionConfig(transactionalIdExpirationMs:
Int = TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT,
- transactionMaxTimeoutMs: Int
= TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT,
- transactionLogNumPartitions:
Int = TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT,
-
transactionLogReplicationFactor: Short =
TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT,
- transactionLogSegmentBytes:
Int = TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT,
-
transactionLogLoadBufferSize: Int =
TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT,
-
transactionLogMinInsyncReplicas: Int =
TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT,
-
abortTimedOutTransactionsIntervalMs: Int =
TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT,
-
removeExpiredTransactionalIdsIntervalMs: Int =
TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT,
+private[transaction] case class TransactionConfig(transactionalIdExpirationMs:
Int = TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT,
+ transactionMaxTimeoutMs: Int
= TransactionStateManagerConfig.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT,
+ transactionLogNumPartitions:
Int = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT,
+
transactionLogReplicationFactor: Short =
TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT,
+ transactionLogSegmentBytes:
Int = TransactionLogConfig.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT,
+
transactionLogLoadBufferSize: Int =
TransactionLogConfig.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT,
+
transactionLogMinInsyncReplicas: Int =
TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT,
+
abortTimedOutTransactionsIntervalMs: Int =
TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT,
+
removeExpiredTransactionalIdsIntervalMs: Int =
TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT,
requestTimeoutMs: Int =
ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT)
case class TransactionalIdAndProducerIdEpoch(transactionalId: String,
producerId: Long, producerEpoch: Short) {
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index e9d782bba9d..3a447e22ef8 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -1588,9 +1588,9 @@ object LogManager {
flushRecoveryOffsetCheckpointMs =
config.logFlushOffsetCheckpointIntervalMs,
flushStartOffsetCheckpointMs =
config.logFlushStartOffsetCheckpointIntervalMs,
retentionCheckMs = config.logCleanupIntervalMs,
- maxTransactionTimeoutMs = config.transactionMaxTimeoutMs,
- producerStateManagerConfig = new
ProducerStateManagerConfig(config.producerIdExpirationMs,
config.transactionPartitionVerificationEnable),
- producerIdExpirationCheckIntervalMs =
config.producerIdExpirationCheckIntervalMs,
+ maxTransactionTimeoutMs =
config.transactionStateManagerConfig.transactionMaxTimeoutMs,
+ producerStateManagerConfig = new
ProducerStateManagerConfig(config.transactionLogConfig.producerIdExpirationMs,
config.transactionLogConfig.transactionPartitionVerificationEnable),
+ producerIdExpirationCheckIntervalMs =
config.transactionLogConfig.producerIdExpirationCheckIntervalMs,
scheduler = kafkaScheduler,
brokerTopicStats = brokerTopicStats,
logDirFailureChannel = logDirFailureChannel,
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index a5c0146f84c..d0efdb910b8 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -240,8 +240,8 @@ class DefaultAutoTopicCreationManager(
case TRANSACTION_STATE_TOPIC_NAME =>
new CreatableTopic()
.setName(topic)
- .setNumPartitions(config.transactionTopicPartitions)
- .setReplicationFactor(config.transactionTopicReplicationFactor)
+
.setNumPartitions(config.transactionLogConfig.transactionTopicPartitions)
+
.setReplicationFactor(config.transactionLogConfig.transactionTopicReplicationFactor)
.setConfigs(convertToTopicConfigCollections(
txnCoordinator.transactionTopicConfigs))
case topicName =>
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 23c99e5bcb3..a1d03cf9bce 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
import org.apache.kafka.common.security.authenticator.LoginManager
import org.apache.kafka.common.utils.{ConfigUtils, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.ProcessRole
@@ -87,7 +87,7 @@ import scala.jdk.CollectionConverters._
object DynamicBrokerConfig {
private[server] val DynamicSecurityConfigs =
SslConfigs.RECONFIGURABLE_CONFIGS.asScala
- private[server] val DynamicProducerStateManagerConfig =
Set(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG,
TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG)
+ private[server] val DynamicProducerStateManagerConfig =
Set(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG,
TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG)
val AllDynamicConfigs = DynamicSecurityConfigs ++
LogCleaner.ReconfigurableConfigs ++
@@ -1128,19 +1128,19 @@ class DynamicListenerConfig(server: KafkaBroker)
extends BrokerReconfigurable wi
class DynamicProducerStateManagerConfig(val producerStateManagerConfig:
ProducerStateManagerConfig) extends BrokerReconfigurable with Logging {
def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
- if (producerStateManagerConfig.producerIdExpirationMs !=
newConfig.producerIdExpirationMs) {
- info(s"Reconfigure
${TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG} from
${producerStateManagerConfig.producerIdExpirationMs} to
${newConfig.producerIdExpirationMs}")
-
producerStateManagerConfig.setProducerIdExpirationMs(newConfig.producerIdExpirationMs)
+ if (producerStateManagerConfig.producerIdExpirationMs !=
newConfig.transactionLogConfig.producerIdExpirationMs) {
+ info(s"Reconfigure
${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG} from
${producerStateManagerConfig.producerIdExpirationMs} to
${newConfig.transactionLogConfig.producerIdExpirationMs}")
+
producerStateManagerConfig.setProducerIdExpirationMs(newConfig.transactionLogConfig.producerIdExpirationMs)
}
- if (producerStateManagerConfig.transactionVerificationEnabled !=
newConfig.transactionPartitionVerificationEnable) {
- info(s"Reconfigure
${TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG} from
${producerStateManagerConfig.transactionVerificationEnabled} to
${newConfig.transactionPartitionVerificationEnable}")
-
producerStateManagerConfig.setTransactionVerificationEnabled(newConfig.transactionPartitionVerificationEnable)
+ if (producerStateManagerConfig.transactionVerificationEnabled !=
newConfig.transactionLogConfig.transactionPartitionVerificationEnable) {
+ info(s"Reconfigure
${TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG} from
${producerStateManagerConfig.transactionVerificationEnabled} to
${newConfig.transactionLogConfig.transactionPartitionVerificationEnable}")
+
producerStateManagerConfig.setTransactionVerificationEnabled(newConfig.transactionLogConfig.transactionPartitionVerificationEnable)
}
}
def validateReconfiguration(newConfig: KafkaConfig): Unit = {
- if (newConfig.producerIdExpirationMs < 0)
- throw new
ConfigException(s"${TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG}
cannot be less than 0, current value is
${producerStateManagerConfig.producerIdExpirationMs}, and new value is
${newConfig.producerIdExpirationMs}")
+ if (newConfig.transactionLogConfig.producerIdExpirationMs < 0)
+ throw new
ConfigException(s"${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG}
cannot be less than 0, current value is
${producerStateManagerConfig.producerIdExpirationMs}, and new value is
${newConfig.transactionLogConfig.producerIdExpirationMs}")
}
override def reconfigurableConfigs: Set[String] =
DynamicProducerStateManagerConfig
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 5f099b31724..46bebf3c116 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -35,7 +35,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs,
TransactionStateManagerConfigs}
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfig,
TransactionStateManagerConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AuthorizerUtils
@@ -238,6 +238,11 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
private val _shareGroupConfig = new ShareGroupConfig(this)
def shareGroupConfig: ShareGroupConfig = _shareGroupConfig
+ private val _transactionLogConfig = new TransactionLogConfig(this)
+ private val _transactionStateManagerConfig = new
TransactionStateManagerConfig(this)
+ def transactionLogConfig: TransactionLogConfig = _transactionLogConfig
+ def transactionStateManagerConfig: TransactionStateManagerConfig =
_transactionStateManagerConfig
+
private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey:
String): Boolean = {
// Use the system property if it exists and the Kafka config value was
defaulted rather than actually provided
// Need to translate any system property value from true/false (String) to
true/false (Boolean)
@@ -596,22 +601,6 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
protocols
}
- /** ********* Transaction management configuration ***********/
- val transactionalIdExpirationMs =
getInt(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG)
- val transactionMaxTimeoutMs =
getInt(TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG)
- val transactionTopicMinISR =
getInt(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG)
- val transactionsLoadBufferSize =
getInt(TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG)
- val transactionTopicReplicationFactor =
getShort(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG)
- val transactionTopicPartitions =
getInt(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG)
- val transactionTopicSegmentBytes =
getInt(TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG)
- val transactionAbortTimedOutTransactionCleanupIntervalMs =
getInt(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG)
- val transactionRemoveExpiredTransactionalIdCleanupIntervalMs =
getInt(TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG)
-
- def transactionPartitionVerificationEnable =
getBoolean(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG)
-
- def producerIdExpirationMs =
getInt(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG)
- val producerIdExpirationCheckIntervalMs =
getInt(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG)
-
/** ********* Metric Configuration **************/
val metricNumSamples = getInt(MetricConfigs.METRIC_NUM_SAMPLES_CONFIG)
val metricSampleWindowMs =
getLong(MetricConfigs.METRIC_SAMPLE_WINDOW_MS_CONFIG)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 7edd80467ed..a34c075822f 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -525,7 +525,7 @@ class KafkaServer(
transactionCoordinator = TransactionCoordinator(config,
replicaManager, new KafkaScheduler(1, true, "transaction-log-manager-"),
() => producerIdManager, metrics, metadataCache, Time.SYSTEM)
transactionCoordinator.startup(
- () =>
zkClient.getTopicPartitionCount(Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionTopicPartitions))
+ () =>
zkClient.getTopicPartitionCount(Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionLogConfig.transactionTopicPartitions))
/* start auto topic creation manager */
this.autoTopicCreationManager = AutoTopicCreationManager(
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 09c3b1dc4d4..e9c114567d9 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1053,7 +1053,7 @@ class ReplicaManager(val config: KafkaConfig,
): Unit = {
// Skip verification if the request is not transactional or transaction
verification is disabled.
if (transactionalId == null ||
- !config.transactionPartitionVerificationEnable
+ !config.transactionLogConfig.transactionPartitionVerificationEnable
|| addPartitionsToTxnManager.isEmpty
) {
callback((Map.empty[TopicPartition, Errors], Map.empty[TopicPartition,
VerificationGuard]))
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 680b8833581..386d355a49c 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -308,7 +308,7 @@ class BrokerMetadataPublisher(
try {
// Start the transaction coordinator.
txnCoordinator.startup(() => metadataCache.numPartitions(
-
Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionTopicPartitions))
+
Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionLogConfig.transactionTopicPartitions))
} catch {
case t: Throwable => fatalFaultHandler.handleFault("Error starting
TransactionCoordinator", t)
}
diff --git
a/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala
b/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala
index 5754dd12787..2e2847b3f4f 100644
---
a/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
import org.apache.kafka.common.errors.{InvalidProducerEpochException,
ProducerFencedException, TimeoutException}
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs,
TransactionStateManagerConfigs}
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfig,
TransactionStateManagerConfig}
import org.apache.kafka.server.config.ServerLogConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, TestInfo, Timeout}
@@ -63,10 +63,10 @@ class AdminFenceProducersIntegrationTest extends
IntegrationTestHarness {
val props = new Properties()
props.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG,
false.toString)
// Set a smaller value for the number of partitions for speed
- props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
1.toString)
-
props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
1.toString)
- props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
1.toString)
-
props.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
"2000")
+ props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
1.toString)
+
props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
1.toString)
+ props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
1.toString)
+
props.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
"2000")
props
}
diff --git
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
index 5e181b9e823..42addf1dd54 100644
---
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
@@ -29,7 +29,7 @@ import
org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC, TRA
import org.apache.kafka.common.security.auth.{AuthenticationContext,
KafkaPrincipal}
import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.{BeforeEach, TestInfo}
@@ -114,9 +114,9 @@ class AbstractAuthorizerIntegrationTest extends
BaseRequestTest {
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
"1")
- properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
"1")
-
properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
"1")
- properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
"1")
+ properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
"1")
+
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
"1")
+ properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1")
properties.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true")
properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
classOf[PrincipalBuilder].getName)
}
diff --git
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
index e053966c079..7723c38ac5e 100644
---
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -28,8 +28,8 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.resource.{PatternType, Resource,
ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{AuthenticationContext,
KafkaPrincipal}
import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
import org.apache.kafka.server.config.ServerConfigs
@@ -91,9 +91,9 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
"1")
- properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
"1")
-
properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
"1")
- properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
"1")
+ properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
"1")
+
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
"1")
+ properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1")
properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
classOf[GroupPrincipalBuilder].getName)
}
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
index f9e5d791695..69184a5e158 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{InvalidPidMappingException,
TransactionalIdNotFoundException}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs,
TransactionStateManagerConfigs}
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfig,
TransactionStateManagerConfig}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs,
ServerLogConfigs}
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
@@ -204,7 +204,7 @@ class ProducerIdExpirationTest extends
KafkaServerTestHarness {
}
private def producerIdExpirationConfig(configValue: String):
util.Map[ConfigResource, util.Collection[AlterConfigOp]] = {
- val producerIdCfg = new
ConfigEntry(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, configValue)
+ val producerIdCfg = new
ConfigEntry(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, configValue)
val configs = Collections.singletonList(new AlterConfigOp(producerIdCfg,
AlterConfigOp.OpType.SET))
Collections.singletonMap(configResource, configs)
}
@@ -230,18 +230,18 @@ class ProducerIdExpirationTest extends
KafkaServerTestHarness {
// Set a smaller value for the number of partitions for the
__consumer_offsets topic
// so that the creation of that topic/partition(s) and subsequent leader
assignment doesn't take relatively long.
serverProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG,
1.toString)
-
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
3.toString)
-
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
2.toString)
- serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
2.toString)
+ serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
3.toString)
+
serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
2.toString)
+ serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
2.toString)
serverProps.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG,
true.toString)
serverProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
false.toString)
serverProps.put(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG,
false.toString)
serverProps.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
"0")
-
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
"200")
-
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG,
"5000")
-
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG,
"500")
- serverProps.put(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG,
"10000")
-
serverProps.put(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG,
"500")
+
serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
"200")
+
serverProps.put(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG,
"5000")
+
serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG,
"500")
+ serverProps.put(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG,
"10000")
+
serverProps.put(TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG,
"500")
serverProps
}
}
diff --git
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index f8440226caa..7190c939b2f 100644
---
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -27,8 +27,8 @@ import org.junit.jupiter.api.Assertions._
import kafka.utils.TestUtils
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -43,8 +43,8 @@ class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
val brokerCount = 1
this.serverConfig.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
"1")
-
this.serverConfig.setProperty(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
"1")
-
this.serverConfig.setProperty(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
"1")
+
this.serverConfig.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
"1")
+
this.serverConfig.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
"1")
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest")
val topic = "topic"
diff --git
a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
index 3cb508958a4..ee47dcae05f 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -24,8 +24,8 @@ import org.apache.kafka.clients.consumer.{Consumer,
ConsumerConfig}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs,
ServerLogConfigs}
import org.junit.jupiter.api.Assertions._
@@ -57,8 +57,8 @@ class TransactionsBounceTest extends IntegrationTestHarness {
overridingProps.put(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, 2.toString)
overridingProps.put(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
"10") // set small enough session timeout
overridingProps.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
"0")
-
overridingProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
1.toString)
-
overridingProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
3.toString)
+
overridingProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
1.toString)
+
overridingProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
3.toString)
// This is the one of the few tests we currently allow to preallocate ports,
despite the fact that this can result in transient
// failures due to ports getting reused. We can't use random ports because
of bad behavior that can result from bouncing
diff --git
a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
index a31385224fd..6063d522992 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{InvalidPidMappingException,
TransactionalIdNotFoundException}
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs,
TransactionStateManagerConfigs}
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfig,
TransactionStateManagerConfig}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs,
ServerLogConfigs}
import org.junit.jupiter.api.Assertions.assertEquals
@@ -205,18 +205,18 @@ class TransactionsExpirationTest extends
KafkaServerTestHarness {
// Set a smaller value for the number of partitions for the
__consumer_offsets topic
// so that the creation of that topic/partition(s) and subsequent leader
assignment doesn't take relatively long.
serverProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG,
1.toString)
-
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
3.toString)
-
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
2.toString)
- serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
2.toString)
+ serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
3.toString)
+
serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
2.toString)
+ serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
2.toString)
serverProps.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG,
true.toString)
serverProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
false.toString)
serverProps.put(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG,
false.toString)
serverProps.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
"0")
-
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
"200")
-
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG,
"10000")
-
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG,
"500")
- serverProps.put(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG,
"5000")
-
serverProps.put(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG,
"500")
+
serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
"200")
+
serverProps.put(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG,
"10000")
+
serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG,
"500")
+ serverProps.put(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG,
"5000")
+
serverProps.put(TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG,
"500")
serverProps
}
}
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 9c8b93012a4..0a64ec9ed51 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer,
ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{InvalidProducerEpochException,
ProducerFencedException, TimeoutException}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs,
TransactionStateManagerConfigs}
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfig,
TransactionStateManagerConfig}
import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs,
ServerLogConfigs}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
@@ -63,14 +63,14 @@ class TransactionsTest extends IntegrationTestHarness {
props.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG,
false.toString)
// Set a smaller value for the number of partitions for the
__consumer_offsets topic + // so that the creation of that topic/partition(s)
and subsequent leader assignment doesn't take relatively long
props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG,
1.toString)
- props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
3.toString)
-
props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
2.toString)
- props.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
2.toString)
+ props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
3.toString)
+
props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
2.toString)
+ props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
2.toString)
props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, true.toString)
props.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
false.toString)
props.put(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG,
false.toString)
props.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
"0")
-
props.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
"200")
+
props.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
"200")
props
}
diff --git
a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
index ae73cde5e84..41674852549 100644
---
a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
+++
b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
@@ -25,7 +25,7 @@ import kafka.utils.TestUtils.consumeRecords
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs,
TransactionStateManagerConfigs}
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfig,
TransactionStateManagerConfig}
import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs,
ServerLogConfigs}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
@@ -107,14 +107,14 @@ class TransactionsWithMaxInFlightOneTest extends
KafkaServerTestHarness {
serverProps.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG,
false.toString)
serverProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG,
1.toString)
serverProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
1.toString)
-
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
1.toString)
-
serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
1.toString)
- serverProps.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
1.toString)
+ serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
1.toString)
+
serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
1.toString)
+ serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
1.toString)
serverProps.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG,
true.toString)
serverProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
false.toString)
serverProps.put(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG,
false.toString)
serverProps.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
"0")
-
serverProps.put(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
"200")
+
serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
"200")
serverProps
}
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 01aef960dd4..03a56d9a9e0 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -61,7 +61,7 @@ import org.apache.kafka.common.requests.MetadataRequest
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.serialization.{StringDeserializer,
StringSerializer}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
import org.apache.kafka.server.config.{ConfigType, ReplicationConfigs,
ServerConfigs, ServerLogConfigs, ZkConfigs}
@@ -1274,7 +1274,7 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
// Dynamically turn verification off.
val configPrefix = listenerPrefix(SecureExternal)
val updatedProps = securityProps(sslProperties1, KEYSTORE_PROPS,
configPrefix)
-
updatedProps.put(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
"false")
+
updatedProps.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
"false")
alterConfigsUsingConfigCommand(updatedProps)
verifyConfiguration(false)
@@ -1286,7 +1286,7 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
verifyConfiguration(false)
// Turn verification back on.
-
updatedProps.put(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
"true")
+
updatedProps.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
"true")
alterConfigsUsingConfigCommand(updatedProps)
verifyConfiguration(true)
}
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala
b/core/src/test/scala/other/kafka/StressTestLog.scala
index 78fda569a36..a604884a06e 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -25,7 +25,7 @@ import
org.apache.kafka.clients.consumer.OffsetOutOfRangeException
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.FileRecords
import org.apache.kafka.common.utils.{Exit, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{FetchIsolation, LogConfig,
LogDirFailureChannel, ProducerStateManagerConfig}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -52,8 +52,8 @@ object StressTestLog {
scheduler = time.scheduler,
time = time,
maxTransactionTimeoutMs = 5 * 60 * 1000,
- producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
- producerIdExpirationCheckIntervalMs =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
+ producerIdExpirationCheckIntervalMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
brokerTopicStats = new BrokerTopicStats,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 0811cec64c5..adef2c63809 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.compress.{Compression,
GzipCompression, Lz4Compre
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{Exit, Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.{KafkaScheduler, Scheduler}
import org.apache.kafka.server.util.CommandLineUtils
import org.apache.kafka.storage.internals.log.{LogConfig,
LogDirFailureChannel, ProducerStateManagerConfig}
@@ -235,8 +235,8 @@ object TestLinearWriteSpeed {
brokerTopicStats = new BrokerTopicStats,
time = Time.SYSTEM,
maxTransactionTimeoutMs = 5 * 60 * 1000,
- producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
- producerIdExpirationCheckIntervalMs =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
+ producerIdExpirationCheckIntervalMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 1f70ccfccd3..9711913036b 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.record.{MemoryRecords,
SimpleRecord}
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.ReplicationConfigs
@@ -304,7 +304,7 @@ class PartitionLockTest extends Logging {
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
log.dir, log.topicPartition, logDirFailureChannel,
log.config.recordVersion, "", None, mockTime.scheduler)
val maxTransactionTimeout = 5 * 60 * 1000
- val producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false)
+ val producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false)
val producerStateManager = new ProducerStateManager(
log.topicPartition,
log.dir,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 1d5e2677c67..27e343d5f03 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -53,7 +53,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
@@ -439,7 +439,7 @@ class PartitionTest extends AbstractPartitionTest {
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
log.dir, log.topicPartition, logDirFailureChannel,
log.config.recordVersion, "", None, time.scheduler)
val maxTransactionTimeoutMs = 5 * 60 * 1000
- val producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
true)
+ val producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
true)
val producerStateManager = new ProducerStateManager(
log.topicPartition,
log.dir,
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index 8ba9b1a8f79..584a49bbc2c 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse,
TransactionResult}
import org.apache.kafka.common.utils.{LogContext, MockTime, ProducerIdAndEpoch}
-import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs
+import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig
import org.apache.kafka.server.util.MockScheduler
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -1004,7 +1004,7 @@ class TransactionCoordinatorTest {
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
val expectedTransition = TxnTransitMetadata(producerId, producerId,
(producerEpoch + 1).toShort, RecordBatch.NO_PRODUCER_EPOCH,
- txnTimeoutMs, PrepareAbort, partitions.toSet, now, now +
TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
+ txnTimeoutMs, PrepareAbort, partitions.toSet, now, now +
TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
when(transactionManager.appendTransactionToLog(ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(coordinatorEpoch),
@@ -1015,7 +1015,7 @@ class TransactionCoordinatorTest {
).thenAnswer(_ => {})
coordinator.startup(() => transactionStatePartitionCount, false)
-
time.sleep(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
+
time.sleep(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
scheduler.tick()
verify(transactionManager).timedOutTransactions()
verify(transactionManager,
times(2)).getTransactionState(ArgumentMatchers.eq(transactionalId))
@@ -1064,7 +1064,7 @@ class TransactionCoordinatorTest {
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
metadata))))
coordinator.startup(() => transactionStatePartitionCount, false)
-
time.sleep(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
+
time.sleep(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
scheduler.tick()
verify(transactionManager).timedOutTransactions()
verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId))
@@ -1088,7 +1088,7 @@ class TransactionCoordinatorTest {
val bumpedEpoch = (producerEpoch + 1).toShort
val expectedTransition = TxnTransitMetadata(producerId, producerId,
bumpedEpoch, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs,
- PrepareAbort, partitions.toSet, now, now +
TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
+ PrepareAbort, partitions.toSet, now, now +
TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
when(transactionManager.appendTransactionToLog(ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(coordinatorEpoch),
@@ -1099,7 +1099,7 @@ class TransactionCoordinatorTest {
).thenAnswer(_ =>
capturedErrorsCallback.getValue.apply(Errors.NOT_ENOUGH_REPLICAS))
coordinator.startup(() => transactionStatePartitionCount, false)
-
time.sleep(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
+
time.sleep(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT)
scheduler.tick()
verify(transactionManager).timedOutTransactions()
diff --git
a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index ff94377b2f1..87470168527 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig,
LogDirFailureChannel, ProducerStateManagerConfig}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -114,8 +114,8 @@ abstract class AbstractLogCleanerIntegrationTest {
time = time,
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
- producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
- producerIdExpirationCheckIntervalMs =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
+ producerIdExpirationCheckIntervalMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true)
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 1495d688d5c..36a08b875f4 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.{CompressionType, MemoryRecords,
RecordBatch, SimpleRecord}
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{FetchIsolation, LogConfig,
LogDirFailureChannel, ProducerStateManagerConfig}
@@ -65,8 +65,8 @@ class BrokerCompressionTest {
time = time,
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
- producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
- producerIdExpirationCheckIntervalMs =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
+ producerIdExpirationCheckIntervalMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 978b856a9fa..44a8a90a39c 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
LogDirFailureChannel, LogSegment, LogSegments, LogStartOffsetIncrementReason,
ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -56,7 +56,7 @@ class LogCleanerManagerTest extends Logging {
val logConfig: LogConfig = new LogConfig(logProps)
val time = new MockTime(1400000000000L, 1000L) // Tue May 13 16:53:20 UTC
2014 for `currentTimeMs`
val offset = 999
- val producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false)
+ val producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false)
val cleanerCheckpoints: mutable.Map[TopicPartition, Long] =
mutable.Map[TopicPartition, Long]()
@@ -107,7 +107,7 @@ class LogCleanerManagerTest extends Logging {
val logDirFailureChannel = new LogDirFailureChannel(10)
val config = createLowRetentionLogConfig(logSegmentSize,
TopicConfig.CLEANUP_POLICY_COMPACT)
val maxTransactionTimeoutMs = 5 * 60 * 1000
- val producerIdExpirationCheckIntervalMs =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
+ val producerIdExpirationCheckIntervalMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
val segments = new LogSegments(tp)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
tpDir, topicPartition, logDirFailureChannel, config.recordVersion, "",
None, time.scheduler)
@@ -817,7 +817,7 @@ class LogCleanerManagerTest extends Logging {
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = producerStateManagerConfig,
- producerIdExpirationCheckIntervalMs =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ producerIdExpirationCheckIntervalMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true)
@@ -871,7 +871,7 @@ class LogCleanerManagerTest extends Logging {
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = producerStateManagerConfig,
- producerIdExpirationCheckIntervalMs =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ producerIdExpirationCheckIntervalMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index f8350cf7427..8db79e8186b 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils,
LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap,
ProducerStateManager, ProducerStateManagerConfig}
@@ -64,7 +64,7 @@ class LogCleanerTest extends Logging {
val throttler = new Throttler(Double.MaxValue, Long.MaxValue, "throttler",
"entries", time)
val tombstoneRetentionMs = 86400000
val largeTimestamp = Long.MaxValue - tombstoneRetentionMs - 1
- val producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false)
+ val producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false)
@AfterEach
def teardown(): Unit = {
@@ -187,7 +187,7 @@ class LogCleanerTest extends Logging {
val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
val logDirFailureChannel = new LogDirFailureChannel(10)
val maxTransactionTimeoutMs = 5 * 60 * 1000
- val producerIdExpirationCheckIntervalMs =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
+ val producerIdExpirationCheckIntervalMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
val logSegments = new LogSegments(topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
dir, topicPartition, logDirFailureChannel, config.recordVersion, "",
None, time.scheduler)
@@ -2057,7 +2057,7 @@ class LogCleanerTest extends Logging {
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = producerStateManagerConfig,
- producerIdExpirationCheckIntervalMs =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ producerIdExpirationCheckIntervalMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
index 0793768bfbd..126ebdd4e69 100644
--- a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
@@ -23,7 +23,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.SimpleRecord
import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.storage.internals.log.{FetchIsolation, LogConfig,
LogDirFailureChannel, ProducerStateManagerConfig}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -152,8 +152,8 @@ class LogConcurrencyTest {
brokerTopicStats = brokerTopicStats,
time = Time.SYSTEM,
maxTransactionTimeoutMs = 5 * 60 * 1000,
- producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
- producerIdExpirationCheckIntervalMs =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
+ producerIdExpirationCheckIntervalMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index 5c0bcd3b4d7..10b7ba4a507 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch,
MemoryRecords, RecordBatch, RecordVersion, SimpleRecord, TimestampType}
import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0
import org.apache.kafka.server.util.{MockTime, Scheduler}
@@ -60,8 +60,8 @@ class LogLoaderTest {
var config: KafkaConfig = _
val brokerTopicStats = new BrokerTopicStats
val maxTransactionTimeoutMs: Int = 5 * 60 * 1000
- val producerStateManagerConfig: ProducerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false)
- val producerIdExpirationCheckIntervalMs: Int =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
+ val producerStateManagerConfig: ProducerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false)
+ val producerIdExpirationCheckIntervalMs: Int =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
val tmpDir = TestUtils.tempDir()
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
var logsToClose: Seq[UnifiedLog] = Seq()
@@ -103,7 +103,7 @@ class LogLoaderTest {
val logDirFailureChannel = new LogDirFailureChannel(logDirs.size)
val maxTransactionTimeoutMs = 5 * 60 * 1000
- val producerIdExpirationCheckIntervalMs =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
+ val producerIdExpirationCheckIntervalMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
// Create a LogManager with some overridden methods to facilitate
interception of clean shutdown
// flag and to inject an error
@@ -353,7 +353,7 @@ class LogLoaderTest {
def createLogWithInterceptedReads(recoveryPoint: Long): UnifiedLog = {
val maxTransactionTimeoutMs = 5 * 60 * 1000
- val producerIdExpirationCheckIntervalMs =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
+ val producerIdExpirationCheckIntervalMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val logDirFailureChannel = new LogDirFailureChannel(10)
// Intercept all segment read calls
@@ -512,7 +512,7 @@ class LogLoaderTest {
firstAppendTimestamp, coordinatorEpoch = coordinatorEpoch)
assertEquals(firstAppendTimestamp,
log.producerStateManager.lastEntry(producerId).get.lastTimestamp)
- val maxProducerIdExpirationMs =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT
+ val maxProducerIdExpirationMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT
mockTime.sleep(maxProducerIdExpirationMs)
assertEquals(Optional.empty(),
log.producerStateManager.lastEntry(producerId))
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 9e96ec16bdc..2d5d6a849b8 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -28,7 +28,7 @@ import
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrTopic
import org.apache.kafka.common.requests.{AbstractControlRequest,
LeaderAndIsrRequest}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{DirectoryId, KafkaException, TopicIdPartition,
TopicPartition, Uuid}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.{TopicImage, TopicsImage}
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
import org.apache.kafka.metadata.properties.{MetaProperties,
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
@@ -829,7 +829,7 @@ class LogManagerTest {
val segmentBytes = 1024
val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats,
time.scheduler, time, 0, 0,
- 5 * 60 * 1000, new
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false), TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT)
+ 5 * 60 * 1000, new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false), TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT)
assertTrue(expectedSegmentsPerLog > 0)
// calculate numMessages to append to logs. It'll create
"expectedSegmentsPerLog" log segments with segment.bytes=1024
@@ -965,7 +965,7 @@ class LogManagerTest {
recoveryPoint = 0,
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = new ProducerStateManagerConfig(5 * 60 *
1000, false),
- producerIdExpirationCheckIntervalMs =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ producerIdExpirationCheckIntervalMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
scheduler = mock(classOf[Scheduler]),
time = mockTime,
brokerTopicStats = mockBrokerTopicStats,
@@ -1375,8 +1375,8 @@ class LogManagerTest {
flushStartOffsetCheckpointMs = 10000L,
retentionCheckMs = 1000L,
maxTransactionTimeoutMs = 5 * 60 * 1000,
- producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
- producerIdExpirationCheckIntervalMs =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
+ producerIdExpirationCheckIntervalMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
scheduler = scheduler,
time = Time.SYSTEM,
brokerTopicStats = new BrokerTopicStats,
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index e2272941ab3..a715bcc1ba9 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -23,7 +23,7 @@ import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{MockTime, Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.MockScheduler
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
@@ -640,7 +640,7 @@ class LogSegmentTest {
topicPartition,
logDir,
5 * 60 * 1000,
- new
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
+ new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
new MockTime()
)
}
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 28037861253..12562e4aac3 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -31,7 +31,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals,
assertFalse}
import java.nio.file.Files
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
@@ -99,8 +99,8 @@ object LogTestUtils {
logStartOffset: Long = 0L,
recoveryPoint: Long = 0L,
maxTransactionTimeoutMs: Int = 5 * 60 * 1000,
- producerStateManagerConfig: ProducerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
- producerIdExpirationCheckIntervalMs: Int =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
+ producerStateManagerConfig: ProducerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
+ producerIdExpirationCheckIntervalMs: Int =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
lastShutdownClean: Boolean = true,
topicId: Option[Uuid] = None,
keepPartitionMetadataFile: Boolean = true,
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 3c5a8ab468b..1c617c20d36 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -32,7 +32,7 @@ import
org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{ListOffsetsRequest,
ListOffsetsResponse}
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
@@ -66,7 +66,7 @@ class UnifiedLogTest {
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
val mockTime = new MockTime()
var logsToClose: Seq[UnifiedLog] = Seq()
- val producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false)
+ val producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false)
def metricsKeySet =
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
@BeforeEach
@@ -4452,7 +4452,7 @@ class UnifiedLogTest {
time: Time = mockTime,
maxTransactionTimeoutMs: Int = 60 * 60 * 1000,
producerStateManagerConfig: ProducerStateManagerConfig
= producerStateManagerConfig,
- producerIdExpirationCheckIntervalMs: Int =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ producerIdExpirationCheckIntervalMs: Int =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
lastShutdownClean: Boolean = true,
topicId: Option[Uuid] = None,
keepPartitionMetadataFile: Boolean = true,
diff --git
a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index 9732b0c025d..dbd7febaaed 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -36,8 +36,8 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.coordinator.group.{GroupCoordinator,
GroupCoordinatorConfig}
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.server.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
@@ -69,10 +69,10 @@ class AutoTopicCreationManagerTest {
props.setProperty(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG,
requestTimeout.toString)
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
internalTopicPartitions.toString)
-
props.setProperty(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
internalTopicPartitions.toString)
+
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
internalTopicPartitions.toString)
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG,
internalTopicReplicationFactor.toString)
-
props.setProperty(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
internalTopicReplicationFactor.toString)
+
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
internalTopicReplicationFactor.toString)
config = KafkaConfig.fromProps(props)
val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1))
@@ -358,7 +358,7 @@ class AutoTopicCreationManagerTest {
case Topic.GROUP_METADATA_TOPIC_NAME => getNewTopic(topicName,
numPartitions =
config.groupCoordinatorConfig.offsetsTopicPartitions, replicationFactor =
config.groupCoordinatorConfig.offsetsTopicReplicationFactor)
case Topic.TRANSACTION_STATE_TOPIC_NAME => getNewTopic(topicName,
- numPartitions = config.transactionTopicPartitions, replicationFactor
= config.transactionTopicReplicationFactor)
+ numPartitions =
config.transactionLogConfig.transactionTopicPartitions, replicationFactor =
config.transactionLogConfig.transactionTopicReplicationFactor)
}
} else {
getNewTopic(topicName)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 7bba1dc7e04..1b685e4265d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -76,7 +76,7 @@ import
org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection,
ProducerIdAndEpoch, SecurityUtils, Utils}
import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG}
import org.apache.kafka.coordinator.group.{GroupCoordinator,
GroupCoordinatorConfig}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
import org.apache.kafka.raft.QuorumConfig
@@ -1340,8 +1340,8 @@ class KafkaApisTest extends Logging {
groupId, AuthorizationResult.ALLOWED)
Topic.GROUP_METADATA_TOPIC_NAME
case CoordinatorType.TRANSACTION =>
-
topicConfigOverride.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
numBrokersNeeded.toString)
-
topicConfigOverride.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
numBrokersNeeded.toString)
+
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
numBrokersNeeded.toString)
+
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
numBrokersNeeded.toString)
when(txnCoordinator.transactionTopicConfigs).thenReturn(new
Properties)
authorizeResource(authorizer, AclOperation.DESCRIBE,
ResourceType.TRANSACTIONAL_ID,
groupId, AuthorizationResult.ALLOWED)
@@ -1459,8 +1459,8 @@ class KafkaApisTest extends Logging {
true
case Topic.TRANSACTION_STATE_TOPIC_NAME =>
-
topicConfigOverride.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
numBrokersNeeded.toString)
-
topicConfigOverride.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
numBrokersNeeded.toString)
+
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
numBrokersNeeded.toString)
+
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
numBrokersNeeded.toString)
when(txnCoordinator.transactionTopicConfigs).thenReturn(new
Properties)
true
case _ =>
@@ -8880,7 +8880,7 @@ class KafkaApisTest extends Logging {
).build()
val requestChannelRequest = buildRequest(offsetCommitRequest)
-
+
metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_2_2_IV1)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller,
None)
kafkaApis = createKafkaApis(IBP_2_2_IV1)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index f120784870a..b056c19b3f1 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -34,7 +34,7 @@ import
org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs,
TransactionStateManagerConfigs}
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfig,
TransactionStateManagerConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.PasswordEncoderConfigs
@@ -982,13 +982,13 @@ class KafkaConfigTest {
case GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG
=> assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
- case
TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
- case TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG
=> assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
- case TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
- case TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
- case TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
- case TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
- case
TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
+ case
TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
+ case TransactionStateManagerConfig.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG
=> assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
+ case TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
+ case TransactionLogConfig.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
+ case TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
+ case TransactionLogConfig.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
+ case TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG
=> assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case QuotaConfigs.QUOTA_WINDOW_SIZE_SECONDS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_boolean", "0")
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 29c4c602c17..fa77e38c7c0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -51,7 +51,7 @@ import
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Exit, LogContext, Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image._
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
@@ -2549,9 +2549,9 @@ class ReplicaManagerTest {
// Dynamically enable verification.
config.dynamicConfig.initialize(None, None)
val props = new Properties()
-
props.put(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
"true")
+
props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
"true")
config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
- TestUtils.waitUntilTrue(() =>
config.transactionPartitionVerificationEnable == true, "Config did not
dynamically update.")
+ TestUtils.waitUntilTrue(() =>
config.transactionLogConfig.transactionPartitionVerificationEnable == true,
"Config did not dynamically update.")
// Try to append more records. We don't need to send a request since the
transaction is already ongoing.
val moreTransactionalRecords =
MemoryRecords.withTransactionalRecords(Compression.NONE, producerId,
producerEpoch, sequence + 1,
@@ -2601,9 +2601,9 @@ class ReplicaManagerTest {
// Disable verification
config.dynamicConfig.initialize(None, None)
val props = new Properties()
-
props.put(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
"false")
+
props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
"false")
config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
- TestUtils.waitUntilTrue(() =>
config.transactionPartitionVerificationEnable == false, "Config did not
dynamically update.")
+ TestUtils.waitUntilTrue(() =>
config.transactionLogConfig.transactionPartitionVerificationEnable == false,
"Config did not dynamically update.")
// Confirm we did not write to the log and instead returned error.
val callback: AddPartitionsToTxnManager.AppendCallback =
appendCallback.getValue()
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 04d42dadd75..f8f5eb8b62a 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -41,7 +41,7 @@ import org.apache.kafka.common.utils.{Exit, Utils}
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
import
org.apache.kafka.coordinator.group.generated.{ConsumerGroupMemberMetadataValue,
ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, GroupMetadataKey,
GroupMetadataValue}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch, VoterSetTest}
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion}
@@ -93,8 +93,8 @@ class DumpLogSegmentsTest {
time = time,
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
- producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
- producerIdExpirationCheckIntervalMs =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
+ producerIdExpirationCheckIntervalMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index 917f53eed60..d9004824080 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic._
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
import kafka.log.{LocalLog, LogLoader, UnifiedLog}
import kafka.utils.TestUtils.retry
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{LogConfig,
LogDirFailureChannel, LogSegments, ProducerStateManager,
ProducerStateManagerConfig}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -134,8 +134,8 @@ class SchedulerTest {
val logConfig = new LogConfig(new Properties())
val brokerTopicStats = new BrokerTopicStats
val maxTransactionTimeoutMs = 5 * 60 * 1000
- val maxProducerIdExpirationMs =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT
- val producerIdExpirationCheckIntervalMs =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
+ val maxProducerIdExpirationMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT
+ val producerIdExpirationCheckIntervalMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val logDirFailureChannel = new LogDirFailureChannel(10)
val segments = new LogSegments(topicPartition)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index d3bd7a59d6b..c3a19864321 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -51,7 +51,7 @@ import org.apache.kafka.common.serialization._
import org.apache.kafka.common.utils.Utils.formatAddress
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.metrics.RequestChannelMetrics
@@ -1195,8 +1195,8 @@ object TestUtils extends Logging {
flushStartOffsetCheckpointMs = 10000L,
retentionCheckMs = 1000L,
maxTransactionTimeoutMs = 5 * 60 * 1000,
- producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
transactionVerificationEnabled),
- producerIdExpirationCheckIntervalMs =
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
transactionVerificationEnabled),
+ producerIdExpirationCheckIntervalMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
scheduler = time.scheduler,
time = time,
brokerTopicStats = new BrokerTopicStats,
diff --git
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index ea7f3b78ed4..b8479642e72 100644
---
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs;
-import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.security.PasswordEncoderConfigs;
@@ -57,8 +57,8 @@ public abstract class AbstractKafkaConfig extends
AbstractConfig {
CleanerConfig.CONFIG_DEF,
LogConfig.SERVER_CONFIG_DEF,
ShareGroupConfig.CONFIG_DEF,
- TransactionLogConfigs.CONFIG_DEF,
- TransactionStateManagerConfigs.CONFIG_DEF,
+ TransactionLogConfig.CONFIG_DEF,
+ TransactionStateManagerConfig.CONFIG_DEF,
QuorumConfig.CONFIG_DEF,
MetricConfigs.CONFIG_DEF,
QuotaConfigs.CONFIG_DEF,
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
index 9f5eee05c99..acff9bc6f29 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
@@ -59,7 +59,7 @@ import static java.util.Arrays.asList;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
-import static
org.apache.kafka.coordinator.transaction.TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT;
+import static
org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT;
import static
org.apache.kafka.storage.internals.log.ProducerStateManager.LATE_TRANSACTION_BUFFER_MS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index dc82464107b..f4f03b98330 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -23,7 +23,7 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
-import org.apache.kafka.coordinator.transaction.TransactionLogConfigs;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ConfigType;
import org.apache.kafka.server.config.ServerConfigs;
@@ -124,7 +124,7 @@ public class EmbeddedKafkaCluster {
putIfAbsent(brokerConfig,
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0);
putIfAbsent(brokerConfig,
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) 1);
putIfAbsent(brokerConfig,
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 5);
- putIfAbsent(brokerConfig,
TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 5);
+ putIfAbsent(brokerConfig,
TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 5);
putIfAbsent(brokerConfig,
ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, true);
for (int i = 0; i < brokers.length; i++) {
diff --git
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigs.java
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java
similarity index 59%
rename from
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigs.java
rename to
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java
index b80a87c6e86..c8f236f4d6a 100644
---
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigs.java
+++
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.transaction;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
@@ -25,7 +26,7 @@ import static
org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
-public final class TransactionLogConfigs {
+public final class TransactionLogConfig {
// Log-level config and default values
public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG =
"transaction.state.log.num.partitions";
public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50;
@@ -61,15 +62,67 @@ public final class TransactionLogConfigs {
public static final int PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT =
600000;
public static final String PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC =
"The interval at which to remove producer IDs that have expired due to
<code>producer.id.expiration.ms</code> passing.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, atLeast(1),
HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DOC)
-
.define(TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG, INT,
TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH,
TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DOC)
-
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
SHORT, TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT,
atLeast(1), HIGH,
TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC)
-
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, INT,
TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH,
TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DOC)
-
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG, INT,
TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1),
HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC)
+ .define(TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, INT,
TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, atLeast(1), HIGH,
TRANSACTIONS_TOPIC_MIN_ISR_DOC)
+ .define(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG, INT,
TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH,
TRANSACTIONS_LOAD_BUFFER_SIZE_DOC)
+ .define(TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT,
TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH,
TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC)
+ .define(TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, INT,
TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH,
TRANSACTIONS_TOPIC_PARTITIONS_DOC)
+ .define(TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG, INT,
TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH,
TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC)
-
.define(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
BOOLEAN,
TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT, LOW,
TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC)
+ .define(TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, BOOLEAN,
TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT, LOW,
TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC)
- .define(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG,
INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, atLeast(1), LOW,
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DOC)
+ .define(PRODUCER_ID_EXPIRATION_MS_CONFIG, INT,
PRODUCER_ID_EXPIRATION_MS_DEFAULT, atLeast(1), LOW,
PRODUCER_ID_EXPIRATION_MS_DOC)
// Configuration for testing only as default value should be
sufficient for typical usage
-
.defineInternal(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG,
INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
atLeast(1), LOW,
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC);
+ .defineInternal(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG,
INT, PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW,
PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC);
+
+ private final AbstractConfig config;
+ private final int transactionTopicMinISR;
+ private final int transactionsLoadBufferSize;
+ private final short transactionTopicReplicationFactor;
+ private final int transactionTopicPartitions;
+ private final int transactionTopicSegmentBytes;
+ private final int producerIdExpirationCheckIntervalMs;
+
+ public TransactionLogConfig(AbstractConfig config) {
+ this.config = config;
+ this.transactionTopicMinISR =
config.getInt(TRANSACTIONS_TOPIC_MIN_ISR_CONFIG);
+ this.transactionsLoadBufferSize =
config.getInt(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG);
+ this.transactionTopicReplicationFactor =
config.getShort(TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG);
+ this.transactionTopicPartitions =
config.getInt(TRANSACTIONS_TOPIC_PARTITIONS_CONFIG);
+ this.transactionTopicSegmentBytes =
config.getInt(TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG);
+ this.producerIdExpirationCheckIntervalMs =
config.getInt(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG);
+ }
+
+ public int transactionTopicMinISR() {
+ return transactionTopicMinISR;
+ }
+
+ public int transactionsLoadBufferSize() {
+ return transactionsLoadBufferSize;
+ }
+
+ public short transactionTopicReplicationFactor() {
+ return transactionTopicReplicationFactor;
+ }
+
+ public int transactionTopicPartitions() {
+ return transactionTopicPartitions;
+ }
+
+ public int transactionTopicSegmentBytes() {
+ return transactionTopicSegmentBytes;
+ }
+
+ public int producerIdExpirationCheckIntervalMs() {
+ return producerIdExpirationCheckIntervalMs;
+ }
+
+ // This is a broker dynamic config used for
DynamicProducerStateManagerConfig
+ public Boolean transactionPartitionVerificationEnable() {
+ return
config.getBoolean(TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
+ }
+
+ // This is a broker dynamic config used for
DynamicProducerStateManagerConfig
+ public int producerIdExpirationMs() {
+ return config.getInt(PRODUCER_ID_EXPIRATION_MS_CONFIG);
+ }
}
diff --git
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfigs.java
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfig.java
similarity index 61%
rename from
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfigs.java
rename to
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfig.java
index 4da091afc52..46dfb46d129 100644
---
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfigs.java
+++
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfig.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.transaction;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import java.util.concurrent.TimeUnit;
@@ -25,7 +26,7 @@ import static
org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
-public final class TransactionStateManagerConfigs {
+public final class TransactionStateManagerConfig {
// Transaction management configs and default values
public static final String TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG =
"transaction.max.timeout.ms";
public static final int TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT = (int)
TimeUnit.MINUTES.toMillis(15);
@@ -49,9 +50,35 @@ public final class TransactionStateManagerConfigs {
public static final String METRICS_GROUP =
"transaction-coordinator-metrics";
public static final String LOAD_TIME_SENSOR =
"TransactionsPartitionLoadTime";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
-
.define(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG,
INT, TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT,
atLeast(1), HIGH,
TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DOC)
-
.define(TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG, INT,
TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT, atLeast(1),
HIGH, TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DOC)
-
.define(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
INT,
TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT,
atLeast(1), LOW,
TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTIONS_INTERVAL_MS_DOC)
-
.define(TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG,
INT,
TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT,
atLeast(1), LOW,
TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONS_INTERVAL_MS_DOC);
+ .define(TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, INT,
TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT, atLeast(1), HIGH,
TRANSACTIONAL_ID_EXPIRATION_MS_DOC)
+ .define(TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG, INT,
TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH,
TRANSACTIONS_MAX_TIMEOUT_MS_DOC)
+
.define(TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
INT, TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT,
atLeast(1), LOW, TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTIONS_INTERVAL_MS_DOC)
+
.define(TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG,
INT, TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT,
atLeast(1), LOW, TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONS_INTERVAL_MS_DOC);
+ private final int transactionalIdExpirationMs;
+ private final int transactionMaxTimeoutMs;
+ private final int transactionAbortTimedOutTransactionCleanupIntervalMs;
+ private final int transactionRemoveExpiredTransactionalIdCleanupIntervalMs;
+
+ public TransactionStateManagerConfig(AbstractConfig config) {
+ transactionalIdExpirationMs =
config.getInt(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG);
+ transactionMaxTimeoutMs =
config.getInt(TransactionStateManagerConfig.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG);
+ transactionAbortTimedOutTransactionCleanupIntervalMs =
config.getInt(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG);
+ transactionRemoveExpiredTransactionalIdCleanupIntervalMs =
config.getInt(TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG);
+ }
+ public int transactionalIdExpirationMs() {
+ return transactionalIdExpirationMs;
+ }
+
+ public int transactionMaxTimeoutMs() {
+ return transactionMaxTimeoutMs;
+ }
+
+ public int transactionAbortTimedOutTransactionCleanupIntervalMs() {
+ return transactionAbortTimedOutTransactionCleanupIntervalMs;
+ }
+
+ public int transactionRemoveExpiredTransactionalIdCleanupIntervalMs() {
+ return transactionRemoveExpiredTransactionalIdCleanupIntervalMs;
+ }
}