Repository: kafka
Updated Branches:
  refs/heads/trunk 64fc1a7ca -> 20e200878


KAFKA-5279: TransactionCoordinator must expire transactionalIds

remove transactions that have not been updated for at least 
`transactional.id.expiration.ms`

Author: Damian Guy <damian....@gmail.com>

Reviewers: Apurva Mehta, Guozhang Wang

Closes #3101 from dguy/kafka-5279


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/20e20087
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/20e20087
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/20e20087

Branch: refs/heads/trunk
Commit: 20e2008785d46aa0500b02d8737380c50d66da3b
Parents: 64fc1a7
Author: Damian Guy <damian....@gmail.com>
Authored: Thu May 25 11:01:10 2017 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Thu May 25 11:01:10 2017 -0700

----------------------------------------------------------------------
 .../transaction/TransactionCoordinator.scala    |  22 ++--
 .../transaction/TransactionMetadata.scala       |  16 ++-
 .../transaction/TransactionStateManager.scala   |  98 ++++++++++++++-
 .../main/scala/kafka/server/KafkaConfig.scala   |  16 ++-
 .../TransactionCoordinatorTest.scala            |  11 +-
 .../TransactionStateManagerTest.scala           | 119 ++++++++++++++++++-
 6 files changed, 252 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/20e20087/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index b31c0bc..f182420 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -45,7 +45,9 @@ object TransactionCoordinator {
       config.transactionTopicSegmentBytes,
       config.transactionsLoadBufferSize,
       config.transactionTopicMinISR,
-      config.transactionTransactionsExpiredTransactionCleanupIntervalMs)
+      config.transactionAbortTimedOutTransactionCleanupIntervalMs,
+      config.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
+      config.requestTimeoutMs)
 
     val producerIdManager = new ProducerIdManager(config.brokerId, zkUtils)
     // we do not need to turn on reaper thread since no tasks will be expired 
and there are no completed tasks to be purged
@@ -404,8 +406,8 @@ class TransactionCoordinator(brokerId: Int,
 
   def partitionFor(transactionalId: String): Int = 
txnManager.partitionFor(transactionalId)
 
-  private def expireTransactions(): Unit = {
-    txnManager.transactionsToExpire().foreach { txnIdAndPidEpoch =>
+  private def abortTimedOutTransactions(): Unit = {
+    txnManager.timedOutTransactions().foreach { txnIdAndPidEpoch =>
       handleEndTransaction(txnIdAndPidEpoch.transactionalId,
         txnIdAndPidEpoch.producerId,
         txnIdAndPidEpoch.producerEpoch,
@@ -426,16 +428,16 @@ class TransactionCoordinator(brokerId: Int,
   /**
    * Startup logic executed at the same time when the server starts up.
    */
-  def startup(enablePidExpiration: Boolean = true) {
+  def startup(enableTransactionalIdExpiration: Boolean = true) {
     info("Starting up.")
     scheduler.startup()
-    scheduler.schedule("transaction-expiration",
-      expireTransactions,
-      TransactionStateManager.DefaultRemoveExpiredTransactionsIntervalMs,
-      TransactionStateManager.DefaultRemoveExpiredTransactionsIntervalMs
+    scheduler.schedule("transaction-abort",
+      abortTimedOutTransactions,
+      TransactionStateManager.DefaultAbortTimedOutTransactionsIntervalMs,
+      TransactionStateManager.DefaultAbortTimedOutTransactionsIntervalMs
     )
-    if (enablePidExpiration)
-      txnManager.enableProducerIdExpiration()
+    if (enableTransactionalIdExpiration)
+      txnManager.enableTransactionalIdExpiration()
     txnMarkerChannelManager.start()
     isActive.set(true)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/20e20087/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index e1abf0e..5956f1d 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -69,6 +69,11 @@ private[transaction] case object CompleteCommit extends 
TransactionState { val b
  */
 private[transaction] case object CompleteAbort extends TransactionState { val 
byte: Byte = 5 }
 
+/**
+  * TransactionalId has expired and is about to be removed from the 
transaction cache
+  */
+private[transaction] case object Dead extends TransactionState { val byte: 
Byte = 6 }
+
 private[transaction] object TransactionMetadata {
   def apply(transactionalId: String, producerId: Long, producerEpoch: Short, 
txnTimeoutMs: Int, timestamp: Long) =
     new TransactionMetadata(transactionalId, producerId, producerEpoch, 
txnTimeoutMs, Empty,
@@ -87,6 +92,7 @@ private[transaction] object TransactionMetadata {
       case 3 => PrepareAbort
       case 4 => CompleteCommit
       case 5 => CompleteAbort
+      case 6 => Dead
       case unknown => throw new IllegalStateException("Unknown transaction 
state byte " + unknown + " from the transaction status message")
     }
   }
@@ -100,7 +106,8 @@ private[transaction] object TransactionMetadata {
       PrepareCommit -> Set(Ongoing),
       PrepareAbort -> Set(Ongoing),
       CompleteCommit -> Set(PrepareCommit),
-      CompleteAbort -> Set(PrepareAbort))
+      CompleteAbort -> Set(PrepareAbort),
+      Dead -> Set(Empty, CompleteAbort, CompleteCommit))
 }
 
 // this is a immutable object representing the target transition of the 
transaction metadata
@@ -141,7 +148,7 @@ private[transaction] class TransactionMetadata(val 
transactionalId: String,
                                                var state: TransactionState,
                                                val topicPartitions: 
mutable.Set[TopicPartition],
                                                @volatile var 
txnStartTimestamp: Long = -1,
-                                               var txnLastUpdateTimestamp: 
Long) extends Logging {
+                                               @volatile var 
txnLastUpdateTimestamp: Long) extends Logging {
 
   // pending state is used to indicate the state that this transaction is 
going to
   // transit to, and for blocking future attempts to transit it again if it is 
not legal;
@@ -207,6 +214,11 @@ private[transaction] class TransactionMetadata(val 
transactionalId: String,
     prepareTransitionTo(newState, producerEpoch, txnTimeoutMs, 
Set.empty[TopicPartition], txnStartTimestamp, updateTimestamp)
   }
 
+
+  def prepareDead : TxnTransitMetadata = {
+    prepareTransitionTo(Dead, producerEpoch, txnTimeoutMs, 
Set.empty[TopicPartition], txnStartTimestamp, txnLastUpdateTimestamp)
+  }
+
   private def prepareTransitionTo(newState: TransactionState,
                                   newEpoch: Short,
                                   newTxnTimeoutMs: Int,

http://git-wip-us.apache.org/repos/asf/kafka/blob/20e20087/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index e8f8e4d..0d7b5c4 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -25,6 +25,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import kafka.common.KafkaException
 import kafka.log.LogConfig
 import kafka.message.UncompressedCodec
+import kafka.server.Defaults
+import kafka.utils.CoreUtils.inLock
 import kafka.server.ReplicaManager
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils.{Logging, Pool, Scheduler, ZkUtils}
@@ -43,10 +45,10 @@ import scala.collection.JavaConverters._
 
 object TransactionStateManager {
   // default transaction management config values
-  // TODO: this needs to be replaces by the config values
   val DefaultTransactionsMaxTimeoutMs: Int = 
TimeUnit.MINUTES.toMillis(15).toInt
   val DefaultTransactionalIdExpirationMs: Int = TimeUnit.DAYS.toMillis(7).toInt
-  val DefaultRemoveExpiredTransactionsIntervalMs: Int = 
TimeUnit.MINUTES.toMillis(1).toInt
+  val DefaultAbortTimedOutTransactionsIntervalMs: Int = 
TimeUnit.MINUTES.toMillis(1).toInt
+  val DefaultRemoveExpiredTransactionalIdsIntervalMs: Int = 
TimeUnit.HOURS.toMillis(1).toInt
 }
 
 /**
@@ -89,7 +91,7 @@ class TransactionStateManager(brokerId: Int,
   // txn timeout value, we do not need to grab the lock on the metadata object 
upon checking its state
   // since the timestamp is volatile and we will get the lock when actually 
trying to transit the transaction
   // metadata to abort later.
-  def transactionsToExpire(): Iterable[TransactionalIdAndProducerIdEpoch] = {
+  def timedOutTransactions(): Iterable[TransactionalIdAndProducerIdEpoch] = {
     val now = time.milliseconds()
     inReadLock(stateLock) {
       transactionMetadataCache.filter { case (txnPartitionId, _) =>
@@ -112,8 +114,87 @@ class TransactionStateManager(brokerId: Int,
     }
   }
 
-  def enableProducerIdExpiration() {
-    // TODO: add producer id expiration logic
+
+
+  def enableTransactionalIdExpiration() {
+    scheduler.schedule("transactionalId-expiration", () => {
+      val now = time.milliseconds()
+      inReadLock(stateLock) {
+        val transactionalIdByPartition: Map[Int, 
mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
+          transactionMetadataCache.flatMap { case (partition, entry) =>
+            entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => 
txnMetadata.state match {
+              case Empty | CompleteCommit | CompleteAbort => true
+              case _ => false
+            }
+            }.filter { case (_, txnMetadata) =>
+              txnMetadata.txnLastUpdateTimestamp <= now - 
config.transactionalIdExpirationMs
+            }.map { case (transactionalId, txnMetadata) =>
+              val txnMetadataTransition = txnMetadata synchronized {
+                txnMetadata.prepareDead
+              }
+              TransactionalIdCoordinatorEpochAndMetadata(transactionalId, 
entry.coordinatorEpoch, txnMetadataTransition)
+            }
+          }.groupBy { transactionalIdCoordinatorEpochAndMetadata =>
+            
partitionFor(transactionalIdCoordinatorEpochAndMetadata.transactionalId)
+          }
+
+        val recordsPerPartition = transactionalIdByPartition
+          .map { case (partition, transactionalIdCoordinatorEpochAndMetadatas) 
=>
+            val deletes: Array[SimpleRecord] = 
transactionalIdCoordinatorEpochAndMetadatas.map { entry =>
+              new SimpleRecord(now, 
TransactionLog.keyToBytes(entry.transactionalId), null)
+            }.toArray
+            val records = 
MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, deletes: _*)
+            val topicPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
+            (topicPartition, records)
+          }
+
+
+        def removeFromCacheCallback(responses: collection.Map[TopicPartition, 
PartitionResponse]): Unit = {
+          responses.foreach { case (topicPartition, response) =>
+            response.error match {
+              case Errors.NONE =>
+                inReadLock(stateLock) {
+                  val toRemove = 
transactionalIdByPartition(topicPartition.partition())
+                  transactionMetadataCache.get(topicPartition.partition)
+                    .foreach { txnMetadataCacheEntry =>
+                      toRemove.foreach { idCoordinatorEpochAndMetadata =>
+                        val txnMetadata = 
txnMetadataCacheEntry.metadataPerTransactionalId.get(idCoordinatorEpochAndMetadata.transactionalId)
+                        txnMetadata synchronized {
+                          if (txnMetadataCacheEntry.coordinatorEpoch == 
idCoordinatorEpochAndMetadata.coordinatorEpoch
+                            && txnMetadata.pendingState.contains(Dead)
+                            && txnMetadata.producerEpoch == 
idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
+                          )
+                            
txnMetadataCacheEntry.metadataPerTransactionalId.remove(idCoordinatorEpochAndMetadata.transactionalId)
+                          else {
+                             debug(s"failed to remove expired transactionalId: 
${idCoordinatorEpochAndMetadata.transactionalId}" +
+                               s" from cache. pendingState: 
${txnMetadata.pendingState} producerEpoch: ${txnMetadata.producerEpoch}" +
+                               s" expected producerEpoch: 
${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}" +
+                               s" coordinatorEpoch: 
${txnMetadataCacheEntry.coordinatorEpoch} expected coordinatorEpoch: " +
+                               
s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
+                            txnMetadata.pendingState = None
+                          }
+                        }
+                      }
+                    }
+                }
+              case _ =>
+                debug(s"writing transactionalId tombstones for partition: 
${topicPartition.partition} failed with error: ${response.error.message()}")
+            }
+          }
+        }
+
+        replicaManager.appendRecords(
+          config.requestTimeoutMs,
+          TransactionLog.EnforcedRequiredAcks,
+          internalTopicsAllowed = true,
+          isFromClient = false,
+          recordsPerPartition,
+          removeFromCacheCallback,
+          None
+        )
+      }
+
+    }, delay = config.removeExpiredTransactionalIdsIntervalMs, period = 
config.removeExpiredTransactionalIdsIntervalMs)
   }
 
   /**
@@ -524,8 +605,13 @@ private[transaction] case class 
TransactionConfig(transactionalIdExpirationMs: I
                                                   transactionLogSegmentBytes: 
Int = TransactionLog.DefaultSegmentBytes,
                                                   
transactionLogLoadBufferSize: Int = TransactionLog.DefaultLoadBufferSize,
                                                   
transactionLogMinInsyncReplicas: Int = TransactionLog.DefaultMinInSyncReplicas,
-                                                  
removeExpiredTransactionsIntervalMs: Int = 
TransactionStateManager.DefaultRemoveExpiredTransactionsIntervalMs)
+                                                  
abortTimedOutTransactionsIntervalMs: Int = 
TransactionStateManager.DefaultAbortTimedOutTransactionsIntervalMs,
+                                                  
removeExpiredTransactionalIdsIntervalMs: Int = 
TransactionStateManager.DefaultRemoveExpiredTransactionalIdsIntervalMs,
+                                                  requestTimeoutMs: Int = 
Defaults.RequestTimeoutMs)
 
 case class TransactionalIdAndProducerIdEpoch(transactionalId: String, 
producerId: Long, producerEpoch: Short)
 
 case class TransactionPartitionAndLeaderEpoch(txnPartitionId: Int, 
coordinatorEpoch: Int)
+case class TransactionalIdCoordinatorEpochAndMetadata(transactionalId: String,
+                                                      coordinatorEpoch: Int,
+                                                      transitMetadata: 
TxnTransitMetadata)

http://git-wip-us.apache.org/repos/asf/kafka/blob/20e20087/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 99eddab..de036a7 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -165,7 +165,8 @@ object Defaults {
   val TransactionsTopicReplicationFactor = 
TransactionLog.DefaultReplicationFactor
   val TransactionsTopicPartitions = TransactionLog.DefaultNumPartitions
   val TransactionsTopicSegmentBytes = TransactionLog.DefaultSegmentBytes
-  val TransactionsExpiredTransactionCleanupIntervalMS = 
TransactionStateManager.DefaultRemoveExpiredTransactionsIntervalMs
+  val TransactionsAbortTimedOutTransactionsCleanupIntervalMS = 
TransactionStateManager.DefaultAbortTimedOutTransactionsIntervalMs
+  val TransactionsRemoveExpiredTransactionsCleanupIntervalMS = 
TransactionStateManager.DefaultRemoveExpiredTransactionalIdsIntervalMs
 
   /** ********* Quota Configuration ***********/
   val ProducerQuotaBytesPerSecondDefault = 
ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
@@ -350,7 +351,8 @@ object KafkaConfig {
   val TransactionsTopicPartitionsProp = "transaction.state.log.num.partitions"
   val TransactionsTopicSegmentBytesProp = "transaction.state.log.segment.bytes"
   val TransactionsTopicReplicationFactorProp = 
"transaction.state.log.replication.factor"
-  val TransactionsExpiredTransactionCleanupIntervalMsProp = 
"transaction.expired.transaction.cleanup.interval.ms"
+  val TransactionsAbortTimedOutTransactionCleanupIntervalMsProp = 
"transaction.abort.timed.out.transaction.cleanup.interval.ms"
+  val TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp = 
"transaction.remove.expired.transaction.cleanup.interval.ms"
 
   /** ********* Quota Configuration ***********/
   val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default"
@@ -597,7 +599,8 @@ object KafkaConfig {
     "Internal topic creation will fail until the cluster size meets this 
replication factor requirement."
   val TransactionsTopicPartitionsDoc = "The number of partitions for the 
transaction topic (should not change after deployment)."
   val TransactionsTopicSegmentBytesDoc = "The transaction topic segment bytes 
should be kept relatively small in order to facilitate faster log compaction 
and cache loads"
-  val TransactionsExpiredTransactionCleanupIntervalMsDoc = "The interval at 
which to rollback expired transactions"
+  val TransactionsAbortTimedOutTransactionsIntervalMsDoc = "The interval at 
which to rollback transactions that have timed out"
+  val TransactionsRemoveExpiredTransactionsIntervalMsDoc = "The interval at 
which to remove transactions that have expired due to 
<code>transactional.id.expiration.ms<code> passing"
 
   /** ********* Quota Configuration ***********/
   val ProducerQuotaBytesPerSecondDefaultDoc = "DEPRECATED: Used only when 
dynamic default quotas are not configured for <user>, <client-id> or <user, 
client-id> in Zookeeper. " +
@@ -803,7 +806,8 @@ object KafkaConfig {
       .define(TransactionsTopicReplicationFactorProp, SHORT, 
Defaults.TransactionsTopicReplicationFactor, atLeast(1), HIGH, 
TransactionsTopicReplicationFactorDoc)
       .define(TransactionsTopicPartitionsProp, INT, 
Defaults.TransactionsTopicPartitions, atLeast(1), HIGH, 
TransactionsTopicPartitionsDoc)
       .define(TransactionsTopicSegmentBytesProp, INT, 
Defaults.TransactionsTopicSegmentBytes, atLeast(1), HIGH, 
TransactionsTopicSegmentBytesDoc)
-      .define(TransactionsExpiredTransactionCleanupIntervalMsProp, INT, 
Defaults.TransactionsExpiredTransactionCleanupIntervalMS, atLeast(1), LOW, 
TransactionsExpiredTransactionCleanupIntervalMsDoc)
+      .define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, 
Defaults.TransactionsAbortTimedOutTransactionsCleanupIntervalMS, atLeast(1), 
LOW, TransactionsAbortTimedOutTransactionsIntervalMsDoc)
+      .define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, 
INT, Defaults.TransactionsRemoveExpiredTransactionsCleanupIntervalMS, 
atLeast(1), LOW, TransactionsRemoveExpiredTransactionsIntervalMsDoc)
 
       /** ********* Kafka Metrics Configuration ***********/
       .define(MetricNumSamplesProp, INT, Defaults.MetricNumSamples, 
atLeast(1), LOW, MetricNumSamplesDoc)
@@ -1013,7 +1017,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean) extends Abstra
   val transactionTopicReplicationFactor = 
getShort(KafkaConfig.TransactionsTopicReplicationFactorProp)
   val transactionTopicPartitions = 
getInt(KafkaConfig.TransactionsTopicPartitionsProp)
   val transactionTopicSegmentBytes = 
getInt(KafkaConfig.TransactionsTopicSegmentBytesProp)
-  val transactionTransactionsExpiredTransactionCleanupIntervalMs = 
getInt(KafkaConfig.TransactionsExpiredTransactionCleanupIntervalMsProp)
+  val transactionAbortTimedOutTransactionCleanupIntervalMs = 
getInt(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp)
+  val transactionRemoveExpiredTransactionalIdCleanupIntervalMs = 
getInt(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp)
+
 
   /** ********* Metric Configuration **************/
   val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/20e20087/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index e225588..4d953eb 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -519,14 +519,15 @@ class TransactionCoordinatorTest {
     val txnMetadata = new TransactionMetadata(transactionalId, producerId, 
producerEpoch, txnTimeoutMs, Ongoing,
       partitions, now, now)
 
-    EasyMock.expect(transactionManager.transactionsToExpire())
+
+    EasyMock.expect(transactionManager.timedOutTransactions())
       .andReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId, 
producerId, producerEpoch)))
     
EasyMock.expect(transactionManager.getAndMaybeAddTransactionState(EasyMock.eq(transactionalId),
 EasyMock.anyObject()))
       .andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata))))
       .once()
 
     val expectedTransition = TxnTransitMetadata(producerId, producerEpoch, 
txnTimeoutMs, PrepareAbort,
-      partitions.toSet, now, now + 
TransactionStateManager.DefaultRemoveExpiredTransactionsIntervalMs)
+      partitions.toSet, now, now + 
TransactionStateManager.DefaultAbortTimedOutTransactionsIntervalMs)
 
     
EasyMock.expect(transactionManager.appendTransactionToLog(EasyMock.eq(transactionalId),
       EasyMock.eq(coordinatorEpoch),
@@ -540,7 +541,7 @@ class TransactionCoordinatorTest {
     EasyMock.replay(transactionManager, transactionMarkerChannelManager)
 
     coordinator.startup(false)
-    
time.sleep(TransactionStateManager.DefaultRemoveExpiredTransactionsIntervalMs)
+    
time.sleep(TransactionStateManager.DefaultAbortTimedOutTransactionsIntervalMs)
     scheduler.tick()
     EasyMock.verify(transactionManager)
   }
@@ -551,7 +552,7 @@ class TransactionCoordinatorTest {
       partitions, time.milliseconds(), time.milliseconds())
     metadata.prepareAbortOrCommit(PrepareCommit, time.milliseconds())
 
-    EasyMock.expect(transactionManager.transactionsToExpire())
+    EasyMock.expect(transactionManager.timedOutTransactions())
       .andReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId, 
producerId, producerEpoch)))
     
EasyMock.expect(transactionManager.getAndMaybeAddTransactionState(EasyMock.eq(transactionalId),
 EasyMock.anyObject[Option[TransactionMetadata]]()))
       .andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
metadata))))
@@ -559,7 +560,7 @@ class TransactionCoordinatorTest {
     EasyMock.replay(transactionManager, transactionMarkerChannelManager)
 
     coordinator.startup(false)
-    
time.sleep(TransactionStateManager.DefaultRemoveExpiredTransactionsIntervalMs)
+    
time.sleep(TransactionStateManager.DefaultAbortTimedOutTransactionsIntervalMs)
     scheduler.tick()
     EasyMock.verify(transactionManager)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/20e20087/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 8682026..479f99b 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -316,7 +316,7 @@ class TransactionStateManagerTest {
   }
 
   @Test
-  def shouldOnlyConsiderTransactionsInTheOngoingStateForExpiry(): Unit = {
+  def shouldOnlyConsiderTransactionsInTheOngoingStateToAbort(): Unit = {
     for (partitionId <- 0 until numPartitions) {
       transactionManager.addLoadedTransactionsToCache(partitionId, 0, new 
Pool[String, TransactionMetadata]())
     }
@@ -329,7 +329,7 @@ class TransactionStateManagerTest {
     transactionManager.getAndMaybeAddTransactionState("complete-abort", 
Some(transactionMetadata("complete-abort", producerId = 5, state = 
CompleteAbort)))
 
     time.sleep(2000)
-    val expiring = transactionManager.transactionsToExpire()
+    val expiring = transactionManager.timedOutTransactions()
     assertEquals(List(TransactionalIdAndProducerIdEpoch("ongoing", 0, 0)), 
expiring)
   }
 
@@ -343,6 +343,121 @@ class TransactionStateManagerTest {
     verifyWritesTxnMarkersInPrepareState(PrepareAbort)
   }
 
+  @Test
+  def shouldRemoveCompleteCommmitExpiredTransactionalIds(): Unit = {
+    setupAndRunTransactionalIdExpiration(Errors.NONE, CompleteCommit)
+    verifyMetadataDoesntExist(transactionalId1)
+    verifyMetadataDoesExist(transactionalId2)
+  }
+
+  @Test
+  def shouldRemoveCompleteAbortExpiredTransactionalIds(): Unit = {
+    setupAndRunTransactionalIdExpiration(Errors.NONE, CompleteAbort)
+    verifyMetadataDoesntExist(transactionalId1)
+    verifyMetadataDoesExist(transactionalId2)
+  }
+
+  @Test
+  def shouldRemoveEmptyExpiredTransactionalIds(): Unit = {
+    setupAndRunTransactionalIdExpiration(Errors.NONE, Empty)
+    verifyMetadataDoesntExist(transactionalId1)
+    verifyMetadataDoesExist(transactionalId2)
+  }
+
+  @Test
+  def shouldNotRemoveExpiredTransactionalIdsIfLogAppendFails(): Unit = {
+    setupAndRunTransactionalIdExpiration(Errors.NOT_ENOUGH_REPLICAS, 
CompleteAbort)
+    verifyMetadataDoesExist(transactionalId1)
+    verifyMetadataDoesExist(transactionalId2)
+  }
+
+  @Test
+  def shouldNotRemoveOngoingTransactionalIds(): Unit = {
+    setupAndRunTransactionalIdExpiration(Errors.NONE, Ongoing)
+    verifyMetadataDoesExist(transactionalId1)
+    verifyMetadataDoesExist(transactionalId2)
+  }
+
+  @Test
+  def shouldNotRemovePrepareAbortTransactionalIds(): Unit = {
+    setupAndRunTransactionalIdExpiration(Errors.NONE, PrepareAbort)
+    verifyMetadataDoesExist(transactionalId1)
+    verifyMetadataDoesExist(transactionalId2)
+  }
+
+  @Test
+  def shouldNotRemovePrepareCommitTransactionalIds(): Unit = {
+    setupAndRunTransactionalIdExpiration(Errors.NONE, PrepareCommit)
+    verifyMetadataDoesExist(transactionalId1)
+    verifyMetadataDoesExist(transactionalId2)
+  }
+
+  private def verifyMetadataDoesExist(transactionalId: String) = {
+    transactionManager.getAndMaybeAddTransactionState(transactionalId, None) 
match {
+      case Left(errors) => fail("shouldn't have been any errors")
+      case Right(None) => fail("metadata should have been removed")
+      case Right(Some(metadata)) => // ok
+    }
+  }
+
+  private def verifyMetadataDoesntExist(transactionalId: String) = {
+    transactionManager.getAndMaybeAddTransactionState(transactionalId, None) 
match {
+      case Left(errors) => fail("shouldn't have been any errors")
+      case Right(Some(metdata)) => fail("metadata should have been removed")
+      case Right(None) => // ok
+    }
+  }
+
+  private def setupAndRunTransactionalIdExpiration(error: Errors, txnState: 
TransactionState) = {
+    for (partitionId <- 0 until numPartitions) {
+      transactionManager.addLoadedTransactionsToCache(partitionId, 0, new 
Pool[String, TransactionMetadata]())
+    }
+
+    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => 
Unit] = EasyMock.newCapture()
+
+    val partition = new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, 
transactionManager.partitionFor(transactionalId1))
+    val recordsByPartition = Map(partition -> 
MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType,
+      new SimpleRecord(time.milliseconds() + 
txnConfig.removeExpiredTransactionalIdsIntervalMs, 
TransactionLog.keyToBytes(transactionalId1), null)))
+
+    txnState match {
+      case Empty | CompleteCommit | CompleteAbort =>
+
+        EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
+          EasyMock.eq((-1).toShort),
+          EasyMock.eq(true),
+          EasyMock.eq(false),
+          EasyMock.eq(recordsByPartition),
+          EasyMock.capture(capturedArgument),
+          EasyMock.eq(None)
+        )).andAnswer(new IAnswer[Unit] {
+          override def answer(): Unit = {
+            capturedArgument.getValue.apply(
+              Map(partition ->
+                new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP)
+              )
+            )
+          }
+        })
+      case _ => // shouldn't append
+    }
+
+    EasyMock.replay(replicaManager)
+
+    txnMetadata1.txnLastUpdateTimestamp = time.milliseconds() - 
txnConfig.transactionalIdExpirationMs
+    txnMetadata1.state = txnState
+    transactionManager.getAndMaybeAddTransactionState(transactionalId1, 
Some(txnMetadata1))
+
+    txnMetadata2.txnLastUpdateTimestamp = time.milliseconds()
+    transactionManager.getAndMaybeAddTransactionState(transactionalId2, 
Some(txnMetadata2))
+
+    transactionManager.enableTransactionalIdExpiration()
+    time.sleep(txnConfig.removeExpiredTransactionalIdsIntervalMs)
+
+    scheduler.tick()
+
+    EasyMock.verify(replicaManager)
+  }
+
   private def verifyWritesTxnMarkersInPrepareState(state: TransactionState): 
Unit = {
     txnMetadata1.state = state
     txnMetadata1.addPartitions(Set[TopicPartition](new 
TopicPartition("topic1", 0),

Reply via email to