This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b392cf212f KAFKA-14097: Separate configuration for producer ID expiry
(KIP-854) (#12501)
b392cf212f is described below
commit b392cf212f7ed4a82b79c3690b488619c027dba9
Author: Justine Olshan <[email protected]>
AuthorDate: Mon Aug 22 01:56:05 2022 -0700
KAFKA-14097: Separate configuration for producer ID expiry (KIP-854)
(#12501)
This patch implements "KIP-854: Separate configuration for producer ID
expiry" as described here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry.
Reviewers: David Jacot <[email protected]>
---
.../kafka/server/builders/LogManagerBuilder.java | 2 +
core/src/main/scala/kafka/log/LogManager.scala | 9 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 20 +++-
...onTest.scala => ProducerIdExpirationTest.scala} | 124 ++++++++++++++++-----
.../kafka/api/TransactionsExpirationTest.scala | 112 +++++++++++++++++--
.../src/test/scala/other/kafka/StressTestLog.scala | 2 +-
.../scala/other/kafka/TestLinearWriteSpeed.scala | 2 +-
.../log/AbstractLogCleanerIntegrationTest.scala | 2 +-
.../unit/kafka/log/BrokerCompressionTest.scala | 2 +-
.../unit/kafka/log/LogCleanerManagerTest.scala | 7 +-
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 5 +-
.../scala/unit/kafka/log/LogConcurrencyTest.scala | 2 +-
.../test/scala/unit/kafka/log/LogLoaderTest.scala | 10 +-
.../test/scala/unit/kafka/log/LogManagerTest.scala | 4 +-
.../test/scala/unit/kafka/log/LogTestUtils.scala | 2 +-
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 2 +-
.../unit/kafka/tools/DumpLogSegmentsTest.scala | 4 +-
.../scala/unit/kafka/utils/SchedulerTest.scala | 5 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 1 +
19 files changed, 248 insertions(+), 69 deletions(-)
diff --git a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
index 6b6bd919fe..0a7d692a58 100644
--- a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
@@ -46,6 +46,7 @@ public class LogManagerBuilder {
private long retentionCheckMs = 1000L;
private int maxTransactionTimeoutMs = 15 * 60 * 1000;
private int maxPidExpirationMs = 60000;
+ private int producerIdExpirationCheckIntervalMs = 600000;
private MetadataVersion interBrokerProtocolVersion =
MetadataVersion.latest();
private Scheduler scheduler = null;
private BrokerTopicStats brokerTopicStats = null;
@@ -164,6 +165,7 @@ public class LogManagerBuilder {
retentionCheckMs,
maxTransactionTimeoutMs,
maxPidExpirationMs,
+ producerIdExpirationCheckIntervalMs,
interBrokerProtocolVersion,
scheduler,
brokerTopicStats,
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index 886f56c63c..9d779323e5 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -66,6 +66,7 @@ class LogManager(logDirs: Seq[File],
val retentionCheckMs: Long,
val maxTransactionTimeoutMs: Int,
val maxPidExpirationMs: Int,
+ val producerIdExpirationCheckIntervalMs: Int,
interBrokerProtocolVersion: MetadataVersion,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
@@ -276,7 +277,7 @@ class LogManager(logDirs: Seq[File],
recoveryPoint = logRecoveryPoint,
maxTransactionTimeoutMs = maxTransactionTimeoutMs,
maxProducerIdExpirationMs = maxPidExpirationMs,
- producerIdExpirationCheckIntervalMs =
LogManager.ProducerIdExpirationCheckIntervalMs,
+ producerIdExpirationCheckIntervalMs =
producerIdExpirationCheckIntervalMs,
scheduler = scheduler,
time = time,
brokerTopicStats = brokerTopicStats,
@@ -950,7 +951,7 @@ class LogManager(logDirs: Seq[File],
recoveryPoint = 0L,
maxTransactionTimeoutMs = maxTransactionTimeoutMs,
maxProducerIdExpirationMs = maxPidExpirationMs,
- producerIdExpirationCheckIntervalMs =
LogManager.ProducerIdExpirationCheckIntervalMs,
+ producerIdExpirationCheckIntervalMs =
producerIdExpirationCheckIntervalMs,
scheduler = scheduler,
time = time,
brokerTopicStats = brokerTopicStats,
@@ -1347,7 +1348,6 @@ object LogManager {
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
- val ProducerIdExpirationCheckIntervalMs = 10 * 60 * 1000
def apply(config: KafkaConfig,
initialOfflineDirs: Seq[String],
@@ -1375,7 +1375,8 @@ object LogManager {
flushStartOffsetCheckpointMs =
config.logFlushStartOffsetCheckpointIntervalMs,
retentionCheckMs = config.logCleanupIntervalMs,
maxTransactionTimeoutMs = config.transactionMaxTimeoutMs,
- maxPidExpirationMs = config.transactionalIdExpirationMs,
+ maxPidExpirationMs = config.producerIdExpirationMs,
+ producerIdExpirationCheckIntervalMs =
config.producerIdExpirationCheckIntervalMs,
scheduler = kafkaScheduler,
brokerTopicStats = brokerTopicStats,
logDirFailureChannel = logDirFailureChannel,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 860056f9a3..0c222f92e9 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -211,6 +211,9 @@ object Defaults {
val TransactionsAbortTimedOutTransactionsCleanupIntervalMS =
TransactionStateManager.DefaultAbortTimedOutTransactionsIntervalMs
val TransactionsRemoveExpiredTransactionsCleanupIntervalMS =
TransactionStateManager.DefaultRemoveExpiredTransactionalIdsIntervalMs
+ val ProducerIdExpirationMs = 86400000
+ val ProducerIdExpirationCheckIntervalMs = 600000
+
/** ********* Fetch Configuration **************/
val MaxIncrementalFetchSessionCacheSlots = 1000
val FetchMaxBytes = 55 * 1024 * 1024
@@ -534,6 +537,9 @@ object KafkaConfig {
val TransactionsAbortTimedOutTransactionCleanupIntervalMsProp =
"transaction.abort.timed.out.transaction.cleanup.interval.ms"
val TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp =
"transaction.remove.expired.transaction.cleanup.interval.ms"
+ val ProducerIdExpirationMsProp = "producer.id.expiration.ms"
+ val ProducerIdExpirationCheckIntervalMsProp =
"producer.id.expiration.check.ms"
+
/** ********* Fetch Configuration **************/
val MaxIncrementalFetchSessionCacheSlots =
"max.incremental.fetch.session.cache.slots"
val FetchMaxBytes = "fetch.max.bytes"
@@ -952,8 +958,7 @@ object KafkaConfig {
val OffsetCommitRequiredAcksDoc = "The required acks before the commit can
be accepted. In general, the default (-1) should not be overridden"
/** ********* Transaction management configuration ***********/
val TransactionalIdExpirationMsDoc = "The time in ms that the transaction
coordinator will wait without receiving any transaction status updates " +
- "for the current transaction before expiring its transactional id. This
setting also influences producer id expiration - producer ids are expired " +
- "once this time has elapsed after the last write with the given producer
id. Note that producer ids may expire sooner if the last write from the
producer id is deleted due to the topic's retention settings."
+ "for the current transaction before expiring its transactional id.
Transactional IDs will not expire while a the transaction is still ongoing."
val TransactionsMaxTimeoutMsDoc = "The maximum allowed timeout for
transactions. " +
"If a client’s requested transaction time exceed this, then the broker
will return an error in InitProducerIdRequest. This prevents a client from too
large of a timeout, which can stall consumers reading from topics included in
the transaction."
val TransactionsTopicMinISRDoc = "Overridden " + MinInSyncReplicasProp + "
config for the transaction topic."
@@ -965,6 +970,11 @@ object KafkaConfig {
val TransactionsAbortTimedOutTransactionsIntervalMsDoc = "The interval at
which to rollback transactions that have timed out"
val TransactionsRemoveExpiredTransactionsIntervalMsDoc = "The interval at
which to remove transactions that have expired due to
<code>transactional.id.expiration.ms</code> passing"
+ val ProducerIdExpirationMsDoc = "The time in ms that a topic partition
leader will wait before expiring producer IDs. Producer IDs will not expire
while a transaction associated to them is still ongoing. " +
+ "Note that producer IDs may expire sooner if the last write from the
producer ID is deleted due to the topic's retention settings. Setting this
value the same or higher than " +
+ "<code>delivery.timeout.ms</code> can help prevent expiration during
retries and protect against message duplication, but the default should be
reasonable for most use cases."
+ val ProducerIdExpirationCheckIntervalMsDoc = "The interval at which to
remove producer IDs that have expired due to
<code>producer.id.expiration.ms</code> passing"
+
/** ********* Fetch Configuration **************/
val MaxIncrementalFetchSessionCacheSlotsDoc = "The maximum number of
incremental fetch sessions that we will maintain."
val FetchMaxBytesDoc = "The maximum number of bytes we will return for a
fetch request. Must be at least 1024."
@@ -1288,6 +1298,10 @@ object KafkaConfig {
.define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT,
Defaults.TransactionsAbortTimedOutTransactionsCleanupIntervalMS, atLeast(1),
LOW, TransactionsAbortTimedOutTransactionsIntervalMsDoc)
.define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp,
INT, Defaults.TransactionsRemoveExpiredTransactionsCleanupIntervalMS,
atLeast(1), LOW, TransactionsRemoveExpiredTransactionsIntervalMsDoc)
+ .define(ProducerIdExpirationMsProp, INT,
Defaults.ProducerIdExpirationMs, atLeast(1), LOW, ProducerIdExpirationMsDoc)
+ // Configuration for testing only as default value should be sufficient
for typical usage
+ .defineInternal(ProducerIdExpirationCheckIntervalMsProp, INT,
Defaults.ProducerIdExpirationCheckIntervalMs, atLeast(1), LOW,
ProducerIdExpirationMsDoc)
+
/** ********* Fetch Configuration **************/
.define(MaxIncrementalFetchSessionCacheSlots, INT,
Defaults.MaxIncrementalFetchSessionCacheSlots, atLeast(0), MEDIUM,
MaxIncrementalFetchSessionCacheSlotsDoc)
.define(FetchMaxBytes, INT, Defaults.FetchMaxBytes, atLeast(1024),
MEDIUM, FetchMaxBytesDoc)
@@ -1849,6 +1863,8 @@ class KafkaConfig private(doLog: Boolean, val props:
java.util.Map[_, _], dynami
val transactionAbortTimedOutTransactionCleanupIntervalMs =
getInt(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp)
val transactionRemoveExpiredTransactionalIdCleanupIntervalMs =
getInt(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp)
+ val producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp)
+ val producerIdExpirationCheckIntervalMs =
getInt(KafkaConfig.ProducerIdExpirationCheckIntervalMsProp)
/** ********* Metric Configuration **************/
val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp)
diff --git
a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
similarity index 51%
copy from
core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
copy to core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
index ddf3a97460..376bf1560a 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
@@ -17,15 +17,19 @@
package kafka.api
-import java.util.Properties
+import java.util.{Collections, Properties}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{TestInfoUtils, TestUtils}
-import kafka.utils.TestUtils.consumeRecords
+import kafka.utils.TestUtils.{consumeRecords, createAdminClient}
+import org.apache.kafka.clients.admin.{Admin, ProducerState}
import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.common.errors.InvalidPidMappingException
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{InvalidPidMappingException,
TransactionalIdNotFoundException}
+import org.apache.kafka.test.{TestUtils => JTestUtils}
+import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -33,15 +37,15 @@ import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
import scala.collection.Seq
-// Test class that uses a very small transaction timeout to trigger
InvalidPidMapping errors
-class TransactionsExpirationTest extends KafkaServerTestHarness {
+class ProducerIdExpirationTest extends KafkaServerTestHarness {
val topic1 = "topic1"
- val topic2 = "topic2"
- val numPartitions = 4
+ val numPartitions = 1
val replicationFactor = 3
+ val tp0 = new TopicPartition(topic1, 0)
var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _
+ var admin: Admin = _
override def generateConfigs: Seq[KafkaConfig] = {
TestUtils.createBrokerConfigs(3,
zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps()))
@@ -50,66 +54,124 @@ class TransactionsExpirationTest extends
KafkaServerTestHarness {
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
-
- producer = TestUtils.createTransactionalProducer("transactionalProducer",
brokers)
consumer = TestUtils.createConsumer(bootstrapServers(),
enableAutoCommit = false,
readCommitted = true)
+ admin = createAdminClient(brokers, listenerName)
createTopic(topic1, numPartitions, 3)
- createTopic(topic2, numPartitions, 3)
}
@AfterEach
override def tearDown(): Unit = {
- producer.close()
- consumer.close()
+ if (producer != null)
+ producer.close()
+ if (consumer != null)
+ consumer.close()
+ if (admin != null)
+ admin.close()
super.tearDown()
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
- def testBumpTransactionalEpochAfterInvalidProducerIdMapping(quorum: String):
Unit = {
+ def testProducerIdExpirationWithNoTransactions(quorum: String): Unit = {
+ producer = TestUtils.createProducer(bootstrapServers(), enableIdempotence
= true)
+
+ // Send records to populate producer state cache.
+ producer.send(new ProducerRecord(topic1, 0, null, "key".getBytes,
"value".getBytes))
+ producer.flush()
+
+ // Ensure producer IDs are added.
+ ensureConsistentKRaftMetadata()
+ assertEquals(1, producerState.size)
+
+ // Wait for the producer ID to expire.
+ TestUtils.waitUntilTrue(() => producerState.isEmpty, "Producer ID did not
expire.")
+
+ // Send more records to send producer ID back to brokers.
+ producer.send(new ProducerRecord(topic1, 0, null, "key".getBytes,
"value".getBytes))
+ producer.flush()
+
+ // Producer IDs should repopulate.
+ assertEquals(1, producerState.size)
+ }
+
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testTransactionAfterTransactionIdExpiresButProducerIdRemains(quorum:
String): Unit = {
+ producer = TestUtils.createTransactionalProducer("transactionalProducer",
brokers)
producer.initTransactions()
- // Start and then abort a transaction to allow the transactional ID to
expire
+ // Start and then abort a transaction to allow the producer ID to expire.
producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0,
"2", "2", willBeCommitted = false))
-
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 0,
"4", "4", willBeCommitted = false))
+ producer.flush()
+
+ // Ensure producer IDs are added.
+ assertEquals(1, producerState.size)
+
producer.abortTransaction()
- // Wait for the transactional ID to expire
- Thread.sleep(3000)
+ // Wait for the transactional ID to expire.
+ waitUntilTransactionalStateExpires()
- // Start a new transaction and attempt to send, which will trigger an
AddPartitionsToTxnRequest, which will fail due to the expired producer ID
+ // Producer IDs should be retained.
+ assertEquals(1, producerState.size)
+
+ // Start a new transaction and attempt to send, which will trigger an
AddPartitionsToTxnRequest, which will fail due to the expired transactional ID.
producer.beginTransaction()
- val failedFuture =
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3,
"1", "1", willBeCommitted = false))
- Thread.sleep(500)
+ val failedFuture =
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0,
"1", "1", willBeCommitted = false))
+ TestUtils.waitUntilTrue(() => failedFuture.isDone, "Producer future never
completed.")
- org.apache.kafka.test.TestUtils.assertFutureThrows(failedFuture,
classOf[InvalidPidMappingException])
+ JTestUtils.assertFutureThrows(failedFuture,
classOf[InvalidPidMappingException])
producer.abortTransaction()
producer.beginTransaction()
-
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2,
null, "2", "2", willBeCommitted = true))
-
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 2,
"4", "4", willBeCommitted = true))
-
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2,
null, "1", "1", willBeCommitted = true))
-
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3,
"3", "3", willBeCommitted = true))
+
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0,
"4", "4", willBeCommitted = true))
+
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0,
"3", "3", willBeCommitted = true))
+
+ // Producer IDs should be retained.
+ assertEquals(1, producerState.size)
+
producer.commitTransaction()
- consumer.subscribe(List(topic1, topic2).asJava)
+ // Check we can still consume the transaction.
+ consumer.subscribe(List(topic1).asJava)
- val records = consumeRecords(consumer, 4)
+ val records = consumeRecords(consumer, 2)
records.foreach { record =>
TestUtils.assertCommittedAndGetValue(record)
}
}
+ private def producerState: List[ProducerState] = {
+ val describeResult =
admin.describeProducers(Collections.singletonList(tp0))
+ val activeProducers =
describeResult.partitionResult(tp0).get().activeProducers
+ activeProducers.asScala.toList
+ }
+
+ private def waitUntilTransactionalStateExpires(): Unit = {
+ TestUtils.waitUntilTrue(() => {
+ var removedTransactionState = false
+ val txnDescribeResult =
admin.describeTransactions(Collections.singletonList("transactionalProducer")).description("transactionalProducer")
+ try {
+ txnDescribeResult.get()
+ } catch {
+ case e: Exception => {
+ removedTransactionState =
e.getCause.isInstanceOf[TransactionalIdNotFoundException]
+ }
+ }
+ removedTransactionState
+ }, "Transaction state never expired.")
+ }
+
private def serverProps(): Properties = {
val serverProps = new Properties()
serverProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
// Set a smaller value for the number of partitions for the
__consumer_offsets topic
- // so that the creation of that topic/partition(s) and subsequent leader
assignment doesn't take relatively long
+ // so that the creation of that topic/partition(s) and subsequent leader
assignment doesn't take relatively long.
serverProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
serverProps.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString)
serverProps.put(KafkaConfig.TransactionsTopicReplicationFactorProp,
2.toString)
@@ -119,8 +181,10 @@ class TransactionsExpirationTest extends
KafkaServerTestHarness {
serverProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
serverProps.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
serverProps.put(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp,
"200")
- serverProps.put(KafkaConfig.TransactionalIdExpirationMsProp, "2000")
+ serverProps.put(KafkaConfig.TransactionalIdExpirationMsProp, "500")
serverProps.put(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp,
"500")
+ serverProps.put(KafkaConfig.ProducerIdExpirationMsProp, "2000")
+ serverProps.put(KafkaConfig.ProducerIdExpirationCheckIntervalMsProp, "500")
serverProps
}
}
diff --git
a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
index ddf3a97460..79fc67e4c5 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
@@ -17,15 +17,18 @@
package kafka.api
-import java.util.Properties
+import java.util.{Collections, Properties}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{TestInfoUtils, TestUtils}
-import kafka.utils.TestUtils.consumeRecords
+import kafka.utils.TestUtils.{consumeRecords, createAdminClient}
+import org.apache.kafka.clients.admin.{Admin, ProducerState}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.common.errors.InvalidPidMappingException
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{InvalidPidMappingException,
TransactionalIdNotFoundException}
+import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -39,9 +42,11 @@ class TransactionsExpirationTest extends
KafkaServerTestHarness {
val topic2 = "topic2"
val numPartitions = 4
val replicationFactor = 3
+ val tp0 = new TopicPartition(topic1, 0)
var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _
+ var admin: Admin = _
override def generateConfigs: Seq[KafkaConfig] = {
TestUtils.createBrokerConfigs(3,
zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps()))
@@ -55,6 +60,7 @@ class TransactionsExpirationTest extends
KafkaServerTestHarness {
consumer = TestUtils.createConsumer(bootstrapServers(),
enableAutoCommit = false,
readCommitted = true)
+ admin = createAdminClient(brokers, listenerName)
createTopic(topic1, numPartitions, 3)
createTopic(topic2, numPartitions, 3)
@@ -62,8 +68,12 @@ class TransactionsExpirationTest extends
KafkaServerTestHarness {
@AfterEach
override def tearDown(): Unit = {
- producer.close()
- consumer.close()
+ if (producer != null)
+ producer.close()
+ if (consumer != null)
+ consumer.close()
+ if (admin != null)
+ admin.close()
super.tearDown()
}
@@ -73,19 +83,20 @@ class TransactionsExpirationTest extends
KafkaServerTestHarness {
def testBumpTransactionalEpochAfterInvalidProducerIdMapping(quorum: String):
Unit = {
producer.initTransactions()
- // Start and then abort a transaction to allow the transactional ID to
expire
+ // Start and then abort a transaction to allow the transactional ID to
expire.
producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0,
"2", "2", willBeCommitted = false))
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 0,
"4", "4", willBeCommitted = false))
producer.abortTransaction()
- // Wait for the transactional ID to expire
- Thread.sleep(3000)
+ // Check the transactional state exists and then wait for it to expire.
+ waitUntilTransactionalStateExists()
+ waitUntilTransactionalStateExpires()
- // Start a new transaction and attempt to send, which will trigger an
AddPartitionsToTxnRequest, which will fail due to the expired producer ID
+ // Start a new transaction and attempt to send, which will trigger an
AddPartitionsToTxnRequest, which will fail due to the expired transactional ID.
producer.beginTransaction()
val failedFuture =
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3,
"1", "1", willBeCommitted = false))
- Thread.sleep(500)
+ TestUtils.waitUntilTrue(() => failedFuture.isDone, "Producer future never
completed.")
org.apache.kafka.test.TestUtils.assertFutureThrows(failedFuture,
classOf[InvalidPidMappingException])
producer.abortTransaction()
@@ -97,6 +108,8 @@ class TransactionsExpirationTest extends
KafkaServerTestHarness {
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3,
"3", "3", willBeCommitted = true))
producer.commitTransaction()
+ waitUntilTransactionalStateExists()
+
consumer.subscribe(List(topic1, topic2).asJava)
val records = consumeRecords(consumer, 4)
@@ -105,11 +118,84 @@ class TransactionsExpirationTest extends
KafkaServerTestHarness {
}
}
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testTransactionAfterProducerIdExpires(quorum: String): Unit = {
+ producer.initTransactions()
+
+ // Start and then abort a transaction to allow the producer ID to expire.
+ producer.beginTransaction()
+
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0,
"2", "2", willBeCommitted = false))
+ producer.flush()
+
+ // Ensure producer IDs are added.
+ val pState = producerState
+ assertEquals(1, pState.size)
+ val oldProducerId = pState(0).producerId
+
+ producer.abortTransaction()
+
+ // Wait for the producer ID to expire.
+ TestUtils.waitUntilTrue(() => producerState.isEmpty, "Producer IDs for
topic1 did not expire.")
+
+ // Create a new producer to check that we retain the producer ID in
transactional state.
+ producer.close()
+ producer = TestUtils.createTransactionalProducer("transactionalProducer",
brokers)
+ producer.initTransactions()
+
+ // Start a new transaction and attempt to send. This should work since
only the producer ID was removed from its mapping in ProducerStateManager.
+ producer.beginTransaction()
+
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0,
"4", "4", willBeCommitted = true))
+
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3,
"3", "3", willBeCommitted = true))
+ producer.commitTransaction()
+
+ // Producer IDs should repopulate.
+ val pState2 = producerState
+ assertEquals(1, pState2.size)
+ val newProducerId = pState2(0).producerId
+
+ // Producer IDs should be the same.
+ assertEquals(oldProducerId, newProducerId)
+
+ consumer.subscribe(List(topic1).asJava)
+
+ val records = consumeRecords(consumer, 2)
+ records.foreach { record =>
+ TestUtils.assertCommittedAndGetValue(record)
+ }
+ }
+
+ private def producerState: List[ProducerState] = {
+ val describeResult =
admin.describeProducers(Collections.singletonList(tp0))
+ val activeProducers =
describeResult.partitionResult(tp0).get().activeProducers
+ activeProducers.asScala.toList
+ }
+
+ private def waitUntilTransactionalStateExpires(): Unit = {
+ TestUtils.waitUntilTrue(() => {
+ var removedTransactionState = false
+ val txnDescribeResult =
admin.describeTransactions(Collections.singletonList("transactionalProducer")).description("transactionalProducer")
+ try {
+ txnDescribeResult.get()
+ } catch {
+ case e: Exception => {
+ removedTransactionState =
e.getCause.isInstanceOf[TransactionalIdNotFoundException]
+ }
+ }
+ removedTransactionState
+ }, "Transaction state never expired.")
+ }
+
+ private def waitUntilTransactionalStateExists(): Unit = {
+ val describeState =
admin.describeTransactions(Collections.singletonList("transactionalProducer")).description("transactionalProducer")
+ TestUtils.waitUntilTrue(() => describeState.isDone, "Transactional state
was never added.")
+ }
+
private def serverProps(): Properties = {
val serverProps = new Properties()
serverProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
// Set a smaller value for the number of partitions for the
__consumer_offsets topic
- // so that the creation of that topic/partition(s) and subsequent leader
assignment doesn't take relatively long
+ // so that the creation of that topic/partition(s) and subsequent leader
assignment doesn't take relatively long.
serverProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
serverProps.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString)
serverProps.put(KafkaConfig.TransactionsTopicReplicationFactorProp,
2.toString)
@@ -119,8 +205,10 @@ class TransactionsExpirationTest extends
KafkaServerTestHarness {
serverProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
serverProps.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
serverProps.put(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp,
"200")
- serverProps.put(KafkaConfig.TransactionalIdExpirationMsProp, "2000")
+ serverProps.put(KafkaConfig.TransactionalIdExpirationMsProp, "1000")
serverProps.put(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp,
"500")
+ serverProps.put(KafkaConfig.ProducerIdExpirationMsProp, "500")
+ serverProps.put(KafkaConfig.ProducerIdExpirationCheckIntervalMsProp, "500")
serverProps
}
}
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala
b/core/src/test/scala/other/kafka/StressTestLog.scala
index a90690ad2a..95ba597b08 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -50,7 +50,7 @@ object StressTestLog {
time = time,
maxTransactionTimeoutMs = 5 * 60 * 1000,
maxProducerIdExpirationMs = 60 * 60 * 1000,
- producerIdExpirationCheckIntervalMs =
LogManager.ProducerIdExpirationCheckIntervalMs,
+ producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
brokerTopicStats = new BrokerTopicStats,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index c342e71361..94609a6bb9 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -220,7 +220,7 @@ object TestLinearWriteSpeed {
time = Time.SYSTEM,
maxTransactionTimeoutMs = 5 * 60 * 1000,
maxProducerIdExpirationMs = 60 * 60 * 1000,
- producerIdExpirationCheckIntervalMs =
LogManager.ProducerIdExpirationCheckIntervalMs,
+ producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true
diff --git
a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index 381ec93a0a..ac73566875 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -111,7 +111,7 @@ abstract class AbstractLogCleanerIntegrationTest {
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
maxProducerIdExpirationMs = 60 * 60 * 1000,
- producerIdExpirationCheckIntervalMs =
LogManager.ProducerIdExpirationCheckIntervalMs,
+ producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true)
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 85745bfe67..ede47de47c 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -62,7 +62,7 @@ class BrokerCompressionTest {
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
maxProducerIdExpirationMs = 60 * 60 * 1000,
- producerIdExpirationCheckIntervalMs =
LogManager.ProducerIdExpirationCheckIntervalMs,
+ producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index fdc05c74f8..2c05286d04 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -100,6 +100,7 @@ class LogCleanerManagerTest extends Logging {
val config = createLowRetentionLogConfig(logSegmentSize, LogConfig.Compact)
val maxTransactionTimeoutMs = 5 * 60 * 1000
val maxProducerIdExpirationMs = 60 * 60 * 1000
+ val producerIdExpirationCheckIntervalMs = 10 * 60 * 1000
val segments = new LogSegments(tp)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(tpDir,
topicPartition, logDirFailureChannel, config.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, tpDir,
maxTransactionTimeoutMs, maxProducerIdExpirationMs, time)
@@ -121,7 +122,7 @@ class LogCleanerManagerTest extends Logging {
offsets.nextOffsetMetadata, time.scheduler, time, tp,
logDirFailureChannel)
// the exception should be caught and the partition that caused it marked
as uncleanable
class LogMock extends UnifiedLog(offsets.logStartOffset, localLog, new
BrokerTopicStats,
- LogManager.ProducerIdExpirationCheckIntervalMs, leaderEpochCache,
+ producerIdExpirationCheckIntervalMs, leaderEpochCache,
producerStateManager, _topicId = None, keepPartitionMetadataFile =
true) {
// Throw an error in getFirstBatchTimestampForSegments since it is
called in grabFilthiestLog()
override def getFirstBatchTimestampForSegments(segments:
Iterable[LogSegment]): Iterable[Long] =
@@ -808,7 +809,7 @@ class LogCleanerManagerTest extends Logging {
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
maxProducerIdExpirationMs = 60 * 60 * 1000,
- producerIdExpirationCheckIntervalMs =
LogManager.ProducerIdExpirationCheckIntervalMs,
+ producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true)
@@ -862,7 +863,7 @@ class LogCleanerManagerTest extends Logging {
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
maxProducerIdExpirationMs = 60 * 60 * 1000,
- producerIdExpirationCheckIntervalMs =
LogManager.ProducerIdExpirationCheckIntervalMs,
+ producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 949e0c59df..19a52268f9 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -106,6 +106,7 @@ class LogCleanerTest {
val logDirFailureChannel = new LogDirFailureChannel(10)
val maxTransactionTimeoutMs = 5 * 60 * 1000
val maxProducerIdExpirationMs = 60 * 60 * 1000
+ val producerIdExpirationCheckIntervalMs = 10 * 60 * 1000
val logSegments = new LogSegments(topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(dir,
topicPartition, logDirFailureChannel, config.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, dir,
@@ -129,7 +130,7 @@ class LogCleanerTest {
val log = new UnifiedLog(offsets.logStartOffset,
localLog,
brokerTopicStats = new BrokerTopicStats,
- producerIdExpirationCheckIntervalMs =
LogManager.ProducerIdExpirationCheckIntervalMs,
+ producerIdExpirationCheckIntervalMs =
producerIdExpirationCheckIntervalMs,
leaderEpochCache = leaderEpochCache,
producerStateManager = producerStateManager,
_topicId = None,
@@ -1973,7 +1974,7 @@ class LogCleanerTest {
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
maxProducerIdExpirationMs = 60 * 60 * 1000,
- producerIdExpirationCheckIntervalMs =
LogManager.ProducerIdExpirationCheckIntervalMs,
+ producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
index db3222bb7f..c60e661a4b 100644
--- a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
@@ -150,7 +150,7 @@ class LogConcurrencyTest {
time = Time.SYSTEM,
maxTransactionTimeoutMs = 5 * 60 * 1000,
maxProducerIdExpirationMs = 60 * 60 * 1000,
- producerIdExpirationCheckIntervalMs =
LogManager.ProducerIdExpirationCheckIntervalMs,
+ producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index c6379ff3f3..168d6e0b05 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -49,6 +49,7 @@ class LogLoaderTest {
val brokerTopicStats = new BrokerTopicStats
val maxTransactionTimeoutMs: Int = 5 * 60 * 1000
val maxProducerIdExpirationMs: Int = 60 * 60 * 1000
+ val producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000
val tmpDir = TestUtils.tempDir()
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
val mockTime = new MockTime()
@@ -89,6 +90,7 @@ class LogLoaderTest {
val maxTransactionTimeoutMs = 5 * 60 * 1000
val maxProducerIdExpirationMs = 60 * 60 * 1000
+ val producerIdExpirationCheckIntervalMs = 10 * 60 * 1000
// Create a LogManager with some overridden methods to facilitate
interception of clean shutdown
// flag and to inject an error
@@ -109,6 +111,7 @@ class LogLoaderTest {
retentionCheckMs = 1000L,
maxTransactionTimeoutMs = maxTransactionTimeoutMs,
maxPidExpirationMs = maxProducerIdExpirationMs,
+ producerIdExpirationCheckIntervalMs =
producerIdExpirationCheckIntervalMs,
interBrokerProtocolVersion = config.interBrokerProtocolVersion,
scheduler = time.scheduler,
brokerTopicStats = new BrokerTopicStats(),
@@ -149,7 +152,7 @@ class LogLoaderTest {
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime,
topicPartition,
logDirFailureChannel)
new UnifiedLog(offsets.logStartOffset, localLog, brokerTopicStats,
- LogManager.ProducerIdExpirationCheckIntervalMs, leaderEpochCache,
+ producerIdExpirationCheckIntervalMs, leaderEpochCache,
producerStateManager, None, true)
}
}
@@ -246,7 +249,7 @@ class LogLoaderTest {
time: Time = mockTime,
maxTransactionTimeoutMs: Int = maxTransactionTimeoutMs,
maxProducerIdExpirationMs: Int =
maxProducerIdExpirationMs,
- producerIdExpirationCheckIntervalMs: Int =
LogManager.ProducerIdExpirationCheckIntervalMs,
+ producerIdExpirationCheckIntervalMs: Int =
producerIdExpirationCheckIntervalMs,
lastShutdownClean: Boolean = true): UnifiedLog = {
LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time,
logStartOffset, recoveryPoint,
maxTransactionTimeoutMs, maxProducerIdExpirationMs,
producerIdExpirationCheckIntervalMs, lastShutdownClean)
@@ -330,6 +333,7 @@ class LogLoaderTest {
def createLogWithInterceptedReads(recoveryPoint: Long): UnifiedLog = {
val maxTransactionTimeoutMs = 5 * 60 * 1000
val maxProducerIdExpirationMs = 60 * 60 * 1000
+ val producerIdExpirationCheckIntervalMs = 10 * 60 * 1000
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val logDirFailureChannel = new LogDirFailureChannel(10)
// Intercept all segment read calls
@@ -373,7 +377,7 @@ class LogLoaderTest {
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime,
topicPartition,
logDirFailureChannel)
new UnifiedLog(offsets.logStartOffset, localLog, brokerTopicStats,
- LogManager.ProducerIdExpirationCheckIntervalMs, leaderEpochCache,
producerStateManager,
+ producerIdExpirationCheckIntervalMs, leaderEpochCache,
producerStateManager,
None, keepPartitionMetadataFile = true)
}
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 1b2dd7809f..001c62aa2d 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -647,7 +647,7 @@ class LogManagerTest {
val segmentBytes = 1024
val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats,
time.scheduler, time, 0, 0,
- 5 * 60 * 1000, 60 * 60 * 1000,
LogManager.ProducerIdExpirationCheckIntervalMs)
+ 5 * 60 * 1000, 60 * 60 * 1000, 10 * 60 * 1000)
assertTrue(expectedSegmentsPerLog > 0)
// calculate numMessages to append to logs. It'll create
"expectedSegmentsPerLog" log segments with segment.bytes=1024
@@ -783,7 +783,7 @@ class LogManagerTest {
recoveryPoint = 0,
maxTransactionTimeoutMs = 5 * 60 * 1000,
maxProducerIdExpirationMs = 5 * 60 * 1000,
- producerIdExpirationCheckIntervalMs =
LogManager.ProducerIdExpirationCheckIntervalMs,
+ producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
scheduler = mockTime.scheduler,
time = mockTime,
brokerTopicStats = mockBrokerTopicStats,
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 50af76f556..4ff4c25601 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -81,7 +81,7 @@ object LogTestUtils {
recoveryPoint: Long = 0L,
maxTransactionTimeoutMs: Int = 5 * 60 * 1000,
maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
- producerIdExpirationCheckIntervalMs: Int =
LogManager.ProducerIdExpirationCheckIntervalMs,
+ producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000,
lastShutdownClean: Boolean = true,
topicId: Option[Uuid] = None,
keepPartitionMetadataFile: Boolean = true,
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 57409a1f03..9db288b529 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -3505,7 +3505,7 @@ class UnifiedLogTest {
time: Time = mockTime,
maxTransactionTimeoutMs: Int = 60 * 60 * 1000,
maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
- producerIdExpirationCheckIntervalMs: Int =
LogManager.ProducerIdExpirationCheckIntervalMs,
+ producerIdExpirationCheckIntervalMs: Int = 10 * 60 *
1000,
lastShutdownClean: Boolean = true,
topicId: Option[Uuid] = None,
keepPartitionMetadataFile: Boolean = true): UnifiedLog
= {
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 5d5e462b5a..4e76dcd00b 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import java.util
import java.util.Properties
-import kafka.log.{AppendOrigin, Defaults, LogConfig, LogManager, LogTestUtils,
UnifiedLog}
+import kafka.log.{AppendOrigin, Defaults, LogConfig, LogTestUtils, UnifiedLog}
import kafka.raft.{KafkaMetadataLog, MetadataLogConfig}
import kafka.server.{BrokerTopicStats, FetchLogEnd, KafkaRaftServer,
LogDirFailureChannel}
import kafka.tools.DumpLogSegments.TimeIndexDumpErrors
@@ -75,7 +75,7 @@ class DumpLogSegmentsTest {
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
maxProducerIdExpirationMs = 60 * 60 * 1000,
- producerIdExpirationCheckIntervalMs =
LogManager.ProducerIdExpirationCheckIntervalMs,
+ producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index 91c8f270f3..c39c73f728 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -19,7 +19,7 @@ package kafka.utils
import java.util.Properties
import java.util.concurrent.atomic._
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
-import kafka.log.{LocalLog, UnifiedLog, LogConfig, LogLoader, LogManager,
LogSegments, ProducerStateManager}
+import kafka.log.{LocalLog, UnifiedLog, LogConfig, LogLoader, LogSegments,
ProducerStateManager}
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.utils.TestUtils.retry
import org.junit.jupiter.api.Assertions._
@@ -120,6 +120,7 @@ class SchedulerTest {
val brokerTopicStats = new BrokerTopicStats
val maxTransactionTimeoutMs = 5 * 60 * 1000
val maxProducerIdExpirationMs = 60 * 60 * 1000
+ val producerIdExpirationCheckIntervalMs = 10 * 60 * 1000
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val logDirFailureChannel = new LogDirFailureChannel(10)
val segments = new LogSegments(topicPartition)
@@ -144,7 +145,7 @@ class SchedulerTest {
offsets.nextOffsetMetadata, scheduler, mockTime, topicPartition,
logDirFailureChannel)
val log = new UnifiedLog(logStartOffset = offsets.logStartOffset,
localLog = localLog,
- brokerTopicStats, LogManager.ProducerIdExpirationCheckIntervalMs,
+ brokerTopicStats, producerIdExpirationCheckIntervalMs,
leaderEpochCache, producerStateManager,
_topicId = None, keepPartitionMetadataFile = true)
assertTrue(scheduler.taskRunning(log.producerExpireCheck))
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1e0d5981da..592d02dc4b 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1346,6 +1346,7 @@ object TestUtils extends Logging {
retentionCheckMs = 1000L,
maxTransactionTimeoutMs = 5 * 60 * 1000,
maxPidExpirationMs = 60 * 60 * 1000,
+ producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
scheduler = time.scheduler,
time = time,
brokerTopicStats = new BrokerTopicStats,