This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b392cf212f KAFKA-14097: Separate configuration for producer ID expiry 
(KIP-854) (#12501)
b392cf212f is described below

commit b392cf212f7ed4a82b79c3690b488619c027dba9
Author: Justine Olshan <jols...@confluent.io>
AuthorDate: Mon Aug 22 01:56:05 2022 -0700

    KAFKA-14097: Separate configuration for producer ID expiry (KIP-854) 
(#12501)
    
    This patch implements "KIP-854: Separate configuration for producer ID 
expiry" as described here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry.
    
    Reviewers: David Jacot <dja...@confluent.io>
---
 .../kafka/server/builders/LogManagerBuilder.java   |   2 +
 core/src/main/scala/kafka/log/LogManager.scala     |   9 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  20 +++-
 ...onTest.scala => ProducerIdExpirationTest.scala} | 124 ++++++++++++++++-----
 .../kafka/api/TransactionsExpirationTest.scala     | 112 +++++++++++++++++--
 .../src/test/scala/other/kafka/StressTestLog.scala |   2 +-
 .../scala/other/kafka/TestLinearWriteSpeed.scala   |   2 +-
 .../log/AbstractLogCleanerIntegrationTest.scala    |   2 +-
 .../unit/kafka/log/BrokerCompressionTest.scala     |   2 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala     |   7 +-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |   5 +-
 .../scala/unit/kafka/log/LogConcurrencyTest.scala  |   2 +-
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  |  10 +-
 .../test/scala/unit/kafka/log/LogManagerTest.scala |   4 +-
 .../test/scala/unit/kafka/log/LogTestUtils.scala   |   2 +-
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala |   2 +-
 .../unit/kafka/tools/DumpLogSegmentsTest.scala     |   4 +-
 .../scala/unit/kafka/utils/SchedulerTest.scala     |   5 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   1 +
 19 files changed, 248 insertions(+), 69 deletions(-)

diff --git a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java 
b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
index 6b6bd919fe..0a7d692a58 100644
--- a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
@@ -46,6 +46,7 @@ public class LogManagerBuilder {
     private long retentionCheckMs = 1000L;
     private int maxTransactionTimeoutMs = 15 * 60 * 1000;
     private int maxPidExpirationMs = 60000;
+    private int producerIdExpirationCheckIntervalMs = 600000;
     private MetadataVersion interBrokerProtocolVersion = 
MetadataVersion.latest();
     private Scheduler scheduler = null;
     private BrokerTopicStats brokerTopicStats = null;
@@ -164,6 +165,7 @@ public class LogManagerBuilder {
                               retentionCheckMs,
                               maxTransactionTimeoutMs,
                               maxPidExpirationMs,
+                              producerIdExpirationCheckIntervalMs,
                               interBrokerProtocolVersion,
                               scheduler,
                               brokerTopicStats,
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 886f56c63c..9d779323e5 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -66,6 +66,7 @@ class LogManager(logDirs: Seq[File],
                  val retentionCheckMs: Long,
                  val maxTransactionTimeoutMs: Int,
                  val maxPidExpirationMs: Int,
+                 val producerIdExpirationCheckIntervalMs: Int,
                  interBrokerProtocolVersion: MetadataVersion,
                  scheduler: Scheduler,
                  brokerTopicStats: BrokerTopicStats,
@@ -276,7 +277,7 @@ class LogManager(logDirs: Seq[File],
       recoveryPoint = logRecoveryPoint,
       maxTransactionTimeoutMs = maxTransactionTimeoutMs,
       maxProducerIdExpirationMs = maxPidExpirationMs,
-      producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+      producerIdExpirationCheckIntervalMs = 
producerIdExpirationCheckIntervalMs,
       scheduler = scheduler,
       time = time,
       brokerTopicStats = brokerTopicStats,
@@ -950,7 +951,7 @@ class LogManager(logDirs: Seq[File],
           recoveryPoint = 0L,
           maxTransactionTimeoutMs = maxTransactionTimeoutMs,
           maxProducerIdExpirationMs = maxPidExpirationMs,
-          producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+          producerIdExpirationCheckIntervalMs = 
producerIdExpirationCheckIntervalMs,
           scheduler = scheduler,
           time = time,
           brokerTopicStats = brokerTopicStats,
@@ -1347,7 +1348,6 @@ object LogManager {
 
   val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
   val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
-  val ProducerIdExpirationCheckIntervalMs = 10 * 60 * 1000
 
   def apply(config: KafkaConfig,
             initialOfflineDirs: Seq[String],
@@ -1375,7 +1375,8 @@ object LogManager {
       flushStartOffsetCheckpointMs = 
config.logFlushStartOffsetCheckpointIntervalMs,
       retentionCheckMs = config.logCleanupIntervalMs,
       maxTransactionTimeoutMs = config.transactionMaxTimeoutMs,
-      maxPidExpirationMs = config.transactionalIdExpirationMs,
+      maxPidExpirationMs = config.producerIdExpirationMs,
+      producerIdExpirationCheckIntervalMs = 
config.producerIdExpirationCheckIntervalMs,
       scheduler = kafkaScheduler,
       brokerTopicStats = brokerTopicStats,
       logDirFailureChannel = logDirFailureChannel,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 860056f9a3..0c222f92e9 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -211,6 +211,9 @@ object Defaults {
   val TransactionsAbortTimedOutTransactionsCleanupIntervalMS = 
TransactionStateManager.DefaultAbortTimedOutTransactionsIntervalMs
   val TransactionsRemoveExpiredTransactionsCleanupIntervalMS = 
TransactionStateManager.DefaultRemoveExpiredTransactionalIdsIntervalMs
 
+  val ProducerIdExpirationMs = 86400000
+  val ProducerIdExpirationCheckIntervalMs = 600000
+
   /** ********* Fetch Configuration **************/
   val MaxIncrementalFetchSessionCacheSlots = 1000
   val FetchMaxBytes = 55 * 1024 * 1024
@@ -534,6 +537,9 @@ object KafkaConfig {
   val TransactionsAbortTimedOutTransactionCleanupIntervalMsProp = 
"transaction.abort.timed.out.transaction.cleanup.interval.ms"
   val TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp = 
"transaction.remove.expired.transaction.cleanup.interval.ms"
 
+  val ProducerIdExpirationMsProp = "producer.id.expiration.ms"
+  val ProducerIdExpirationCheckIntervalMsProp = 
"producer.id.expiration.check.ms"
+
   /** ********* Fetch Configuration **************/
   val MaxIncrementalFetchSessionCacheSlots = 
"max.incremental.fetch.session.cache.slots"
   val FetchMaxBytes = "fetch.max.bytes"
@@ -952,8 +958,7 @@ object KafkaConfig {
   val OffsetCommitRequiredAcksDoc = "The required acks before the commit can 
be accepted. In general, the default (-1) should not be overridden"
   /** ********* Transaction management configuration ***********/
   val TransactionalIdExpirationMsDoc = "The time in ms that the transaction 
coordinator will wait without receiving any transaction status updates " +
-    "for the current transaction before expiring its transactional id. This 
setting also influences producer id expiration - producer ids are expired " +
-    "once this time has elapsed after the last write with the given producer 
id. Note that producer ids may expire sooner if the last write from the 
producer id is deleted due to the topic's retention settings."
+    "for the current transaction before expiring its transactional id. 
Transactional IDs will not expire while a the transaction is still ongoing."
   val TransactionsMaxTimeoutMsDoc = "The maximum allowed timeout for 
transactions. " +
     "If a client’s requested transaction time exceed this, then the broker 
will return an error in InitProducerIdRequest. This prevents a client from too 
large of a timeout, which can stall consumers reading from topics included in 
the transaction."
   val TransactionsTopicMinISRDoc = "Overridden " + MinInSyncReplicasProp + " 
config for the transaction topic."
@@ -965,6 +970,11 @@ object KafkaConfig {
   val TransactionsAbortTimedOutTransactionsIntervalMsDoc = "The interval at 
which to rollback transactions that have timed out"
   val TransactionsRemoveExpiredTransactionsIntervalMsDoc = "The interval at 
which to remove transactions that have expired due to 
<code>transactional.id.expiration.ms</code> passing"
 
+  val ProducerIdExpirationMsDoc = "The time in ms that a topic partition 
leader will wait before expiring producer IDs. Producer IDs will not expire 
while a transaction associated to them is still ongoing. " +
+    "Note that producer IDs may expire sooner if the last write from the 
producer ID is deleted due to the topic's retention settings. Setting this 
value the same or higher than " +
+    "<code>delivery.timeout.ms</code> can help prevent expiration during 
retries and protect against message duplication, but the default should be 
reasonable for most use cases."
+  val ProducerIdExpirationCheckIntervalMsDoc = "The interval at which to 
remove producer IDs that have expired due to 
<code>producer.id.expiration.ms</code> passing"
+
   /** ********* Fetch Configuration **************/
   val MaxIncrementalFetchSessionCacheSlotsDoc = "The maximum number of 
incremental fetch sessions that we will maintain."
   val FetchMaxBytesDoc = "The maximum number of bytes we will return for a 
fetch request. Must be at least 1024."
@@ -1288,6 +1298,10 @@ object KafkaConfig {
       .define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, 
Defaults.TransactionsAbortTimedOutTransactionsCleanupIntervalMS, atLeast(1), 
LOW, TransactionsAbortTimedOutTransactionsIntervalMsDoc)
       .define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, 
INT, Defaults.TransactionsRemoveExpiredTransactionsCleanupIntervalMS, 
atLeast(1), LOW, TransactionsRemoveExpiredTransactionsIntervalMsDoc)
 
+      .define(ProducerIdExpirationMsProp, INT, 
Defaults.ProducerIdExpirationMs, atLeast(1), LOW, ProducerIdExpirationMsDoc)
+      // Configuration for testing only as default value should be sufficient 
for typical usage
+      .defineInternal(ProducerIdExpirationCheckIntervalMsProp, INT, 
Defaults.ProducerIdExpirationCheckIntervalMs, atLeast(1), LOW, 
ProducerIdExpirationMsDoc)
+
       /** ********* Fetch Configuration **************/
       .define(MaxIncrementalFetchSessionCacheSlots, INT, 
Defaults.MaxIncrementalFetchSessionCacheSlots, atLeast(0), MEDIUM, 
MaxIncrementalFetchSessionCacheSlotsDoc)
       .define(FetchMaxBytes, INT, Defaults.FetchMaxBytes, atLeast(1024), 
MEDIUM, FetchMaxBytesDoc)
@@ -1849,6 +1863,8 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   val transactionAbortTimedOutTransactionCleanupIntervalMs = 
getInt(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp)
   val transactionRemoveExpiredTransactionalIdCleanupIntervalMs = 
getInt(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp)
 
+  val producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp)
+  val producerIdExpirationCheckIntervalMs = 
getInt(KafkaConfig.ProducerIdExpirationCheckIntervalMsProp)
 
   /** ********* Metric Configuration **************/
   val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp)
diff --git 
a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
similarity index 51%
copy from 
core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
copy to core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
index ddf3a97460..376bf1560a 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
@@ -17,15 +17,19 @@
 
 package kafka.api
 
-import java.util.Properties
+import java.util.{Collections, Properties}
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.{TestInfoUtils, TestUtils}
-import kafka.utils.TestUtils.consumeRecords
+import kafka.utils.TestUtils.{consumeRecords, createAdminClient}
+import org.apache.kafka.clients.admin.{Admin, ProducerState}
 import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.common.errors.InvalidPidMappingException
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{InvalidPidMappingException, 
TransactionalIdNotFoundException}
+import org.apache.kafka.test.{TestUtils => JTestUtils}
+import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
@@ -33,15 +37,15 @@ import org.junit.jupiter.params.provider.ValueSource
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 
-// Test class that uses a very small transaction timeout to trigger 
InvalidPidMapping errors
-class TransactionsExpirationTest extends KafkaServerTestHarness {
+class ProducerIdExpirationTest extends KafkaServerTestHarness {
   val topic1 = "topic1"
-  val topic2 = "topic2"
-  val numPartitions = 4
+  val numPartitions = 1
   val replicationFactor = 3
+  val tp0 = new TopicPartition(topic1, 0)
 
   var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
   var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _
+  var admin: Admin = _
 
   override def generateConfigs: Seq[KafkaConfig] = {
     TestUtils.createBrokerConfigs(3, 
zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps()))
@@ -50,66 +54,124 @@ class TransactionsExpirationTest extends 
KafkaServerTestHarness {
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
-
-    producer = TestUtils.createTransactionalProducer("transactionalProducer", 
brokers)
     consumer = TestUtils.createConsumer(bootstrapServers(),
       enableAutoCommit = false,
       readCommitted = true)
+    admin = createAdminClient(brokers, listenerName)
 
     createTopic(topic1, numPartitions, 3)
-    createTopic(topic2, numPartitions, 3)
   }
 
   @AfterEach
   override def tearDown(): Unit = {
-    producer.close()
-    consumer.close()
+    if (producer != null)
+      producer.close()
+    if (consumer != null)
+      consumer.close()
+    if (admin != null)
+      admin.close()
 
     super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testBumpTransactionalEpochAfterInvalidProducerIdMapping(quorum: String): 
Unit = {
+  def testProducerIdExpirationWithNoTransactions(quorum: String): Unit = {
+    producer = TestUtils.createProducer(bootstrapServers(), enableIdempotence 
= true)
+
+    // Send records to populate producer state cache.
+    producer.send(new ProducerRecord(topic1, 0, null, "key".getBytes, 
"value".getBytes))
+    producer.flush()
+
+    // Ensure producer IDs are added.
+    ensureConsistentKRaftMetadata()
+    assertEquals(1, producerState.size)
+
+    // Wait for the producer ID to expire.
+    TestUtils.waitUntilTrue(() => producerState.isEmpty, "Producer ID did not 
expire.")
+
+    // Send more records to send producer ID back to brokers.
+    producer.send(new ProducerRecord(topic1, 0, null, "key".getBytes, 
"value".getBytes))
+    producer.flush()
+
+    // Producer IDs should repopulate.
+    assertEquals(1, producerState.size)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionAfterTransactionIdExpiresButProducerIdRemains(quorum: 
String): Unit = {
+    producer = TestUtils.createTransactionalProducer("transactionalProducer", 
brokers)
     producer.initTransactions()
 
-    // Start and then abort a transaction to allow the transactional ID to 
expire
+    // Start and then abort a transaction to allow the producer ID to expire.
     producer.beginTransaction()
     
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, 
"2", "2", willBeCommitted = false))
-    
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 0, 
"4", "4", willBeCommitted = false))
+    producer.flush()
+
+    // Ensure producer IDs are added.
+    assertEquals(1, producerState.size)
+
     producer.abortTransaction()
 
-    // Wait for the transactional ID to expire
-    Thread.sleep(3000)
+    // Wait for the transactional ID to expire.
+    waitUntilTransactionalStateExpires()
 
-    // Start a new transaction and attempt to send, which will trigger an 
AddPartitionsToTxnRequest, which will fail due to the expired producer ID
+    // Producer IDs should be retained.
+    assertEquals(1, producerState.size)
+
+    // Start a new transaction and attempt to send, which will trigger an 
AddPartitionsToTxnRequest, which will fail due to the expired transactional ID.
     producer.beginTransaction()
-    val failedFuture = 
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3, 
"1", "1", willBeCommitted = false))
-    Thread.sleep(500)
+    val failedFuture = 
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, 
"1", "1", willBeCommitted = false))
+    TestUtils.waitUntilTrue(() => failedFuture.isDone, "Producer future never 
completed.")
 
-    org.apache.kafka.test.TestUtils.assertFutureThrows(failedFuture, 
classOf[InvalidPidMappingException])
+    JTestUtils.assertFutureThrows(failedFuture, 
classOf[InvalidPidMappingException])
     producer.abortTransaction()
 
     producer.beginTransaction()
-    
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 
null, "2", "2", willBeCommitted = true))
-    
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 2, 
"4", "4", willBeCommitted = true))
-    
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 
null, "1", "1", willBeCommitted = true))
-    
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3, 
"3", "3", willBeCommitted = true))
+    
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, 
"4", "4", willBeCommitted = true))
+    
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, 
"3", "3", willBeCommitted = true))
+
+    // Producer IDs should be retained.
+    assertEquals(1, producerState.size)
+
     producer.commitTransaction()
 
-    consumer.subscribe(List(topic1, topic2).asJava)
+    // Check we can still consume the transaction.
+    consumer.subscribe(List(topic1).asJava)
 
-    val records = consumeRecords(consumer, 4)
+    val records = consumeRecords(consumer, 2)
     records.foreach { record =>
       TestUtils.assertCommittedAndGetValue(record)
     }
   }
 
+  private def producerState: List[ProducerState] = {
+    val describeResult = 
admin.describeProducers(Collections.singletonList(tp0))
+    val activeProducers = 
describeResult.partitionResult(tp0).get().activeProducers
+    activeProducers.asScala.toList
+  }
+
+  private def waitUntilTransactionalStateExpires(): Unit = {
+    TestUtils.waitUntilTrue(() =>  {
+      var removedTransactionState = false
+      val txnDescribeResult = 
admin.describeTransactions(Collections.singletonList("transactionalProducer")).description("transactionalProducer")
+      try {
+        txnDescribeResult.get()
+      } catch {
+        case e: Exception => {
+          removedTransactionState = 
e.getCause.isInstanceOf[TransactionalIdNotFoundException]
+        }
+      }
+      removedTransactionState
+    }, "Transaction state never expired.")
+  }
+
   private def serverProps(): Properties = {
     val serverProps = new Properties()
     serverProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
     // Set a smaller value for the number of partitions for the 
__consumer_offsets topic
-    // so that the creation of that topic/partition(s) and subsequent leader 
assignment doesn't take relatively long
+    // so that the creation of that topic/partition(s) and subsequent leader 
assignment doesn't take relatively long.
     serverProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
     serverProps.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString)
     serverProps.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 
2.toString)
@@ -119,8 +181,10 @@ class TransactionsExpirationTest extends 
KafkaServerTestHarness {
     serverProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
     serverProps.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
     
serverProps.put(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp,
 "200")
-    serverProps.put(KafkaConfig.TransactionalIdExpirationMsProp, "2000")
+    serverProps.put(KafkaConfig.TransactionalIdExpirationMsProp, "500")
     
serverProps.put(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp,
 "500")
+    serverProps.put(KafkaConfig.ProducerIdExpirationMsProp, "2000")
+    serverProps.put(KafkaConfig.ProducerIdExpirationCheckIntervalMsProp, "500")
     serverProps
   }
 }
diff --git 
a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
index ddf3a97460..79fc67e4c5 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
@@ -17,15 +17,18 @@
 
 package kafka.api
 
-import java.util.Properties
+import java.util.{Collections, Properties}
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.{TestInfoUtils, TestUtils}
-import kafka.utils.TestUtils.consumeRecords
+import kafka.utils.TestUtils.{consumeRecords, createAdminClient}
+import org.apache.kafka.clients.admin.{Admin, ProducerState}
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.common.errors.InvalidPidMappingException
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{InvalidPidMappingException, 
TransactionalIdNotFoundException}
+import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
@@ -39,9 +42,11 @@ class TransactionsExpirationTest extends 
KafkaServerTestHarness {
   val topic2 = "topic2"
   val numPartitions = 4
   val replicationFactor = 3
+  val tp0 = new TopicPartition(topic1, 0)
 
   var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
   var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _
+  var admin: Admin = _
 
   override def generateConfigs: Seq[KafkaConfig] = {
     TestUtils.createBrokerConfigs(3, 
zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps()))
@@ -55,6 +60,7 @@ class TransactionsExpirationTest extends 
KafkaServerTestHarness {
     consumer = TestUtils.createConsumer(bootstrapServers(),
       enableAutoCommit = false,
       readCommitted = true)
+    admin = createAdminClient(brokers, listenerName)
 
     createTopic(topic1, numPartitions, 3)
     createTopic(topic2, numPartitions, 3)
@@ -62,8 +68,12 @@ class TransactionsExpirationTest extends 
KafkaServerTestHarness {
 
   @AfterEach
   override def tearDown(): Unit = {
-    producer.close()
-    consumer.close()
+    if (producer != null)
+      producer.close()
+    if (consumer != null)
+      consumer.close()
+    if (admin != null)
+      admin.close()
 
     super.tearDown()
   }
@@ -73,19 +83,20 @@ class TransactionsExpirationTest extends 
KafkaServerTestHarness {
   def testBumpTransactionalEpochAfterInvalidProducerIdMapping(quorum: String): 
Unit = {
     producer.initTransactions()
 
-    // Start and then abort a transaction to allow the transactional ID to 
expire
+    // Start and then abort a transaction to allow the transactional ID to 
expire.
     producer.beginTransaction()
     
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, 
"2", "2", willBeCommitted = false))
     
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 0, 
"4", "4", willBeCommitted = false))
     producer.abortTransaction()
 
-    // Wait for the transactional ID to expire
-    Thread.sleep(3000)
+    // Check the transactional state exists and then wait for it to expire.
+    waitUntilTransactionalStateExists()
+    waitUntilTransactionalStateExpires()
 
-    // Start a new transaction and attempt to send, which will trigger an 
AddPartitionsToTxnRequest, which will fail due to the expired producer ID
+    // Start a new transaction and attempt to send, which will trigger an 
AddPartitionsToTxnRequest, which will fail due to the expired transactional ID.
     producer.beginTransaction()
     val failedFuture = 
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3, 
"1", "1", willBeCommitted = false))
-    Thread.sleep(500)
+    TestUtils.waitUntilTrue(() => failedFuture.isDone, "Producer future never 
completed.")
 
     org.apache.kafka.test.TestUtils.assertFutureThrows(failedFuture, 
classOf[InvalidPidMappingException])
     producer.abortTransaction()
@@ -97,6 +108,8 @@ class TransactionsExpirationTest extends 
KafkaServerTestHarness {
     
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3, 
"3", "3", willBeCommitted = true))
     producer.commitTransaction()
 
+    waitUntilTransactionalStateExists()
+
     consumer.subscribe(List(topic1, topic2).asJava)
 
     val records = consumeRecords(consumer, 4)
@@ -105,11 +118,84 @@ class TransactionsExpirationTest extends 
KafkaServerTestHarness {
     }
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionAfterProducerIdExpires(quorum: String): Unit = {
+    producer.initTransactions()
+
+    // Start and then abort a transaction to allow the producer ID to expire.
+    producer.beginTransaction()
+    
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, 
"2", "2", willBeCommitted = false))
+    producer.flush()
+
+    // Ensure producer IDs are added.
+    val pState = producerState
+    assertEquals(1, pState.size)
+    val oldProducerId = pState(0).producerId
+
+    producer.abortTransaction()
+
+    // Wait for the producer ID to expire.
+    TestUtils.waitUntilTrue(() => producerState.isEmpty, "Producer IDs for 
topic1 did not expire.")
+
+    // Create a new producer to check that we retain the producer ID in 
transactional state.
+    producer.close()
+    producer = TestUtils.createTransactionalProducer("transactionalProducer", 
brokers)
+    producer.initTransactions()
+
+    // Start a new transaction and attempt to send. This should work since 
only the producer ID was removed from its mapping in ProducerStateManager.
+    producer.beginTransaction()
+    
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, 
"4", "4", willBeCommitted = true))
+    
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3, 
"3", "3", willBeCommitted = true))
+    producer.commitTransaction()
+
+    // Producer IDs should repopulate.
+    val pState2 = producerState
+    assertEquals(1, pState2.size)
+    val newProducerId = pState2(0).producerId
+
+    // Producer IDs should be the same.
+    assertEquals(oldProducerId, newProducerId)
+
+    consumer.subscribe(List(topic1).asJava)
+
+    val records = consumeRecords(consumer, 2)
+    records.foreach { record =>
+      TestUtils.assertCommittedAndGetValue(record)
+    }
+  }
+
+  private def producerState: List[ProducerState] = {
+    val describeResult = 
admin.describeProducers(Collections.singletonList(tp0))
+    val activeProducers = 
describeResult.partitionResult(tp0).get().activeProducers
+    activeProducers.asScala.toList
+  }
+
+  private def waitUntilTransactionalStateExpires(): Unit = {
+    TestUtils.waitUntilTrue(() =>  {
+      var removedTransactionState = false
+      val txnDescribeResult = 
admin.describeTransactions(Collections.singletonList("transactionalProducer")).description("transactionalProducer")
+      try {
+        txnDescribeResult.get()
+      } catch {
+        case e: Exception => {
+          removedTransactionState = 
e.getCause.isInstanceOf[TransactionalIdNotFoundException]
+        }
+      }
+      removedTransactionState
+    }, "Transaction state never expired.")
+  }
+
+  private def waitUntilTransactionalStateExists(): Unit = {
+    val describeState = 
admin.describeTransactions(Collections.singletonList("transactionalProducer")).description("transactionalProducer")
+    TestUtils.waitUntilTrue(() => describeState.isDone, "Transactional state 
was never added.")
+  }
+
   private def serverProps(): Properties = {
     val serverProps = new Properties()
     serverProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
     // Set a smaller value for the number of partitions for the 
__consumer_offsets topic
-    // so that the creation of that topic/partition(s) and subsequent leader 
assignment doesn't take relatively long
+    // so that the creation of that topic/partition(s) and subsequent leader 
assignment doesn't take relatively long.
     serverProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
     serverProps.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString)
     serverProps.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 
2.toString)
@@ -119,8 +205,10 @@ class TransactionsExpirationTest extends 
KafkaServerTestHarness {
     serverProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
     serverProps.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
     
serverProps.put(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp,
 "200")
-    serverProps.put(KafkaConfig.TransactionalIdExpirationMsProp, "2000")
+    serverProps.put(KafkaConfig.TransactionalIdExpirationMsProp, "1000")
     
serverProps.put(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp,
 "500")
+    serverProps.put(KafkaConfig.ProducerIdExpirationMsProp, "500")
+    serverProps.put(KafkaConfig.ProducerIdExpirationCheckIntervalMsProp, "500")
     serverProps
   }
 }
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala 
b/core/src/test/scala/other/kafka/StressTestLog.scala
index a90690ad2a..95ba597b08 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -50,7 +50,7 @@ object StressTestLog {
       time = time,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
       maxProducerIdExpirationMs = 60 * 60 * 1000,
-      producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+      producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
       brokerTopicStats = new BrokerTopicStats,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala 
b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index c342e71361..94609a6bb9 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -220,7 +220,7 @@ object TestLinearWriteSpeed {
       time = Time.SYSTEM,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
       maxProducerIdExpirationMs = 60 * 60 * 1000,
-      producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+      producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true
diff --git 
a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index 381ec93a0a..ac73566875 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -111,7 +111,7 @@ abstract class AbstractLogCleanerIntegrationTest {
         brokerTopicStats = new BrokerTopicStats,
         maxTransactionTimeoutMs = 5 * 60 * 1000,
         maxProducerIdExpirationMs = 60 * 60 * 1000,
-        producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+        producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
         logDirFailureChannel = new LogDirFailureChannel(10),
         topicId = None,
         keepPartitionMetadataFile = true)
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala 
b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 85745bfe67..ede47de47c 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -62,7 +62,7 @@ class BrokerCompressionTest {
       brokerTopicStats = new BrokerTopicStats,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
       maxProducerIdExpirationMs = 60 * 60 * 1000,
-      producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+      producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index fdc05c74f8..2c05286d04 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -100,6 +100,7 @@ class LogCleanerManagerTest extends Logging {
     val config = createLowRetentionLogConfig(logSegmentSize, LogConfig.Compact)
     val maxTransactionTimeoutMs = 5 * 60 * 1000
     val maxProducerIdExpirationMs = 60 * 60 * 1000
+    val producerIdExpirationCheckIntervalMs = 10 * 60 * 1000
     val segments = new LogSegments(tp)
     val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(tpDir, 
topicPartition, logDirFailureChannel, config.recordVersion, "")
     val producerStateManager = new ProducerStateManager(topicPartition, tpDir, 
maxTransactionTimeoutMs, maxProducerIdExpirationMs, time)
@@ -121,7 +122,7 @@ class LogCleanerManagerTest extends Logging {
       offsets.nextOffsetMetadata, time.scheduler, time, tp, 
logDirFailureChannel)
     // the exception should be caught and the partition that caused it marked 
as uncleanable
     class LogMock extends UnifiedLog(offsets.logStartOffset, localLog, new 
BrokerTopicStats,
-        LogManager.ProducerIdExpirationCheckIntervalMs, leaderEpochCache,
+        producerIdExpirationCheckIntervalMs, leaderEpochCache,
         producerStateManager, _topicId = None, keepPartitionMetadataFile = 
true) {
       // Throw an error in getFirstBatchTimestampForSegments since it is 
called in grabFilthiestLog()
       override def getFirstBatchTimestampForSegments(segments: 
Iterable[LogSegment]): Iterable[Long] =
@@ -808,7 +809,7 @@ class LogCleanerManagerTest extends Logging {
       brokerTopicStats = new BrokerTopicStats,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
       maxProducerIdExpirationMs = 60 * 60 * 1000,
-      producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+      producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true)
@@ -862,7 +863,7 @@ class LogCleanerManagerTest extends Logging {
       brokerTopicStats = new BrokerTopicStats,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
       maxProducerIdExpirationMs = 60 * 60 * 1000,
-      producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+      producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 949e0c59df..19a52268f9 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -106,6 +106,7 @@ class LogCleanerTest {
     val logDirFailureChannel = new LogDirFailureChannel(10)
     val maxTransactionTimeoutMs = 5 * 60 * 1000
     val maxProducerIdExpirationMs = 60 * 60 * 1000
+    val producerIdExpirationCheckIntervalMs = 10 * 60 * 1000
     val logSegments = new LogSegments(topicPartition)
     val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(dir, 
topicPartition, logDirFailureChannel, config.recordVersion, "")
     val producerStateManager = new ProducerStateManager(topicPartition, dir,
@@ -129,7 +130,7 @@ class LogCleanerTest {
     val log = new UnifiedLog(offsets.logStartOffset,
                       localLog,
                       brokerTopicStats = new BrokerTopicStats,
-                      producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+                      producerIdExpirationCheckIntervalMs = 
producerIdExpirationCheckIntervalMs,
                       leaderEpochCache = leaderEpochCache,
                       producerStateManager = producerStateManager,
                       _topicId = None,
@@ -1973,7 +1974,7 @@ class LogCleanerTest {
       brokerTopicStats = new BrokerTopicStats,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
       maxProducerIdExpirationMs = 60 * 60 * 1000,
-      producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+      producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala 
b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
index db3222bb7f..c60e661a4b 100644
--- a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
@@ -150,7 +150,7 @@ class LogConcurrencyTest {
       time = Time.SYSTEM,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
       maxProducerIdExpirationMs = 60 * 60 * 1000,
-      producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+      producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala 
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index c6379ff3f3..168d6e0b05 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -49,6 +49,7 @@ class LogLoaderTest {
   val brokerTopicStats = new BrokerTopicStats
   val maxTransactionTimeoutMs: Int = 5 * 60 * 1000
   val maxProducerIdExpirationMs: Int = 60 * 60 * 1000
+  val producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000
   val tmpDir = TestUtils.tempDir()
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
   val mockTime = new MockTime()
@@ -89,6 +90,7 @@ class LogLoaderTest {
 
     val maxTransactionTimeoutMs = 5 * 60 * 1000
     val maxProducerIdExpirationMs = 60 * 60 * 1000
+    val producerIdExpirationCheckIntervalMs = 10 * 60 * 1000
 
     // Create a LogManager with some overridden methods to facilitate 
interception of clean shutdown
     // flag and to inject an error
@@ -109,6 +111,7 @@ class LogLoaderTest {
         retentionCheckMs = 1000L,
         maxTransactionTimeoutMs = maxTransactionTimeoutMs,
         maxPidExpirationMs = maxProducerIdExpirationMs,
+        producerIdExpirationCheckIntervalMs = 
producerIdExpirationCheckIntervalMs,
         interBrokerProtocolVersion = config.interBrokerProtocolVersion,
         scheduler = time.scheduler,
         brokerTopicStats = new BrokerTopicStats(),
@@ -149,7 +152,7 @@ class LogLoaderTest {
             offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, 
topicPartition,
             logDirFailureChannel)
           new UnifiedLog(offsets.logStartOffset, localLog, brokerTopicStats,
-            LogManager.ProducerIdExpirationCheckIntervalMs, leaderEpochCache,
+            producerIdExpirationCheckIntervalMs, leaderEpochCache,
             producerStateManager, None, true)
         }
       }
@@ -246,7 +249,7 @@ class LogLoaderTest {
                         time: Time = mockTime,
                         maxTransactionTimeoutMs: Int = maxTransactionTimeoutMs,
                         maxProducerIdExpirationMs: Int = 
maxProducerIdExpirationMs,
-                        producerIdExpirationCheckIntervalMs: Int = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+                        producerIdExpirationCheckIntervalMs: Int = 
producerIdExpirationCheckIntervalMs,
                         lastShutdownClean: Boolean = true): UnifiedLog = {
     LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time, 
logStartOffset, recoveryPoint,
       maxTransactionTimeoutMs, maxProducerIdExpirationMs, 
producerIdExpirationCheckIntervalMs, lastShutdownClean)
@@ -330,6 +333,7 @@ class LogLoaderTest {
     def createLogWithInterceptedReads(recoveryPoint: Long): UnifiedLog = {
       val maxTransactionTimeoutMs = 5 * 60 * 1000
       val maxProducerIdExpirationMs = 60 * 60 * 1000
+      val producerIdExpirationCheckIntervalMs = 10 * 60 * 1000
       val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
       val logDirFailureChannel = new LogDirFailureChannel(10)
       // Intercept all segment read calls
@@ -373,7 +377,7 @@ class LogLoaderTest {
         offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, 
topicPartition,
         logDirFailureChannel)
       new UnifiedLog(offsets.logStartOffset, localLog, brokerTopicStats,
-        LogManager.ProducerIdExpirationCheckIntervalMs, leaderEpochCache, 
producerStateManager,
+        producerIdExpirationCheckIntervalMs, leaderEpochCache, 
producerStateManager,
         None, keepPartitionMetadataFile = true)
     }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 1b2dd7809f..001c62aa2d 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -647,7 +647,7 @@ class LogManagerTest {
     val segmentBytes = 1024
 
     val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, 
time.scheduler, time, 0, 0,
-      5 * 60 * 1000, 60 * 60 * 1000, 
LogManager.ProducerIdExpirationCheckIntervalMs)
+      5 * 60 * 1000, 60 * 60 * 1000, 10 * 60 * 1000)
 
     assertTrue(expectedSegmentsPerLog > 0)
     // calculate numMessages to append to logs. It'll create 
"expectedSegmentsPerLog" log segments with segment.bytes=1024
@@ -783,7 +783,7 @@ class LogManagerTest {
         recoveryPoint = 0,
         maxTransactionTimeoutMs = 5 * 60 * 1000,
         maxProducerIdExpirationMs = 5 * 60 * 1000,
-        producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+        producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
         scheduler = mockTime.scheduler,
         time = mockTime,
         brokerTopicStats = mockBrokerTopicStats,
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala 
b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 50af76f556..4ff4c25601 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -81,7 +81,7 @@ object LogTestUtils {
                 recoveryPoint: Long = 0L,
                 maxTransactionTimeoutMs: Int = 5 * 60 * 1000,
                 maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
-                producerIdExpirationCheckIntervalMs: Int = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+                producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000,
                 lastShutdownClean: Boolean = true,
                 topicId: Option[Uuid] = None,
                 keepPartitionMetadataFile: Boolean = true,
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 57409a1f03..9db288b529 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -3505,7 +3505,7 @@ class UnifiedLogTest {
                         time: Time = mockTime,
                         maxTransactionTimeoutMs: Int = 60 * 60 * 1000,
                         maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
-                        producerIdExpirationCheckIntervalMs: Int = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+                        producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 
1000,
                         lastShutdownClean: Boolean = true,
                         topicId: Option[Uuid] = None,
                         keepPartitionMetadataFile: Boolean = true): UnifiedLog 
= {
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala 
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 5d5e462b5a..4e76dcd00b 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
 import java.util
 import java.util.Properties
 
-import kafka.log.{AppendOrigin, Defaults, LogConfig, LogManager, LogTestUtils, 
UnifiedLog}
+import kafka.log.{AppendOrigin, Defaults, LogConfig, LogTestUtils, UnifiedLog}
 import kafka.raft.{KafkaMetadataLog, MetadataLogConfig}
 import kafka.server.{BrokerTopicStats, FetchLogEnd, KafkaRaftServer, 
LogDirFailureChannel}
 import kafka.tools.DumpLogSegments.TimeIndexDumpErrors
@@ -75,7 +75,7 @@ class DumpLogSegmentsTest {
       brokerTopicStats = new BrokerTopicStats,
       maxTransactionTimeoutMs = 5 * 60 * 1000,
       maxProducerIdExpirationMs = 60 * 60 * 1000,
-      producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+      producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
       logDirFailureChannel = new LogDirFailureChannel(10),
       topicId = None,
       keepPartitionMetadataFile = true
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala 
b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index 91c8f270f3..c39c73f728 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -19,7 +19,7 @@ package kafka.utils
 import java.util.Properties
 import java.util.concurrent.atomic._
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
-import kafka.log.{LocalLog, UnifiedLog, LogConfig, LogLoader, LogManager, 
LogSegments, ProducerStateManager}
+import kafka.log.{LocalLog, UnifiedLog, LogConfig, LogLoader, LogSegments, 
ProducerStateManager}
 import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils.TestUtils.retry
 import org.junit.jupiter.api.Assertions._
@@ -120,6 +120,7 @@ class SchedulerTest {
     val brokerTopicStats = new BrokerTopicStats
     val maxTransactionTimeoutMs = 5 * 60 * 1000
     val maxProducerIdExpirationMs = 60 * 60 * 1000
+    val producerIdExpirationCheckIntervalMs = 10 * 60 * 1000
     val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
     val logDirFailureChannel = new LogDirFailureChannel(10)
     val segments = new LogSegments(topicPartition)
@@ -144,7 +145,7 @@ class SchedulerTest {
       offsets.nextOffsetMetadata, scheduler, mockTime, topicPartition, 
logDirFailureChannel)
     val log = new UnifiedLog(logStartOffset = offsets.logStartOffset,
       localLog = localLog,
-      brokerTopicStats, LogManager.ProducerIdExpirationCheckIntervalMs,
+      brokerTopicStats, producerIdExpirationCheckIntervalMs,
       leaderEpochCache, producerStateManager,
       _topicId = None, keepPartitionMetadataFile = true)
     assertTrue(scheduler.taskRunning(log.producerExpireCheck))
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1e0d5981da..592d02dc4b 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1346,6 +1346,7 @@ object TestUtils extends Logging {
                    retentionCheckMs = 1000L,
                    maxTransactionTimeoutMs = 5 * 60 * 1000,
                    maxPidExpirationMs = 60 * 60 * 1000,
+                   producerIdExpirationCheckIntervalMs = 10 * 60 * 1000,
                    scheduler = time.scheduler,
                    time = time,
                    brokerTopicStats = new BrokerTopicStats,

Reply via email to