Repository: kafka Updated Branches: refs/heads/trunk 766dea94e -> 495184916
KAFKA-5132: abort long running transactions Abort any ongoing transactions that haven't been touched for longer than the transaction timeout Author: Damian Guy <[email protected]> Reviewers: Jason Gustafson, Apurva Mehta, Ismael Juma, Guozhang Wang Closes #2957 from dguy/kafka-5132 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/49518491 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/49518491 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/49518491 Branch: refs/heads/trunk Commit: 4951849163b1defea91129472b5354531407deb9 Parents: 766dea9 Author: Damian Guy <[email protected]> Authored: Fri May 12 10:36:02 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri May 12 10:36:02 2017 -0700 ---------------------------------------------------------------------- .../transaction/TransactionCoordinator.scala | 43 ++++++++++++- .../transaction/TransactionMarkerChannel.scala | 3 +- .../TransactionMarkerChannelManager.scala | 2 +- .../transaction/TransactionMetadata.scala | 4 +- .../transaction/TransactionStateManager.scala | 25 ++++++-- .../main/scala/kafka/server/KafkaConfig.scala | 6 ++ .../TransactionCoordinatorTest.scala | 66 ++++++++++++++++++++ .../TransactionStateManagerTest.scala | 32 ++++++++++ 8 files changed, 170 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 38e725f..982e009 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -44,14 +44,15 @@ object TransactionCoordinator { config.transactionTopicReplicationFactor, config.transactionTopicSegmentBytes, config.transactionsLoadBufferSize, - config.transactionTopicMinISR) + config.transactionTopicMinISR, + config.transactionTransactionsExpiredTransactionCleanupIntervalMs) val pidManager = new ProducerIdManager(config.brokerId, zkUtils) val logManager = new TransactionStateManager(config.brokerId, zkUtils, scheduler, replicaManager, txnConfig, time) val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId) val transactionMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnMarkerPurgatory, time) - new TransactionCoordinator(config.brokerId, pidManager, logManager, transactionMarkerChannelManager, txnMarkerPurgatory, time) + new TransactionCoordinator(config.brokerId, pidManager, logManager, transactionMarkerChannelManager, txnMarkerPurgatory, scheduler, time) } private def initTransactionError(error: Errors): InitPidResult = { @@ -76,6 +77,7 @@ class TransactionCoordinator(brokerId: Int, txnManager: TransactionStateManager, txnMarkerChannelManager: TransactionMarkerChannelManager, txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker], + scheduler: Scheduler, time: Time) extends Logging { this.logIdent = "[Transaction Coordinator " + brokerId + "]: " @@ -383,11 +385,45 @@ class TransactionCoordinator(brokerId: Int, def partitionFor(transactionalId: String): Int = txnManager.partitionFor(transactionalId) + private def expireTransactions(): Unit = { + + txnManager.transactionsToExpire().foreach{ idAndMetadata => + idAndMetadata.metadata synchronized { + if (!txnManager.isCoordinatorLoadingInProgress(idAndMetadata.transactionalId) + && idAndMetadata.metadata.pendingState.isEmpty) { + // bump the producerEpoch so that any further requests for this transactionalId will be fenced + idAndMetadata.metadata.producerEpoch = (idAndMetadata.metadata.producerEpoch + 1).toShort + idAndMetadata.metadata.prepareTransitionTo(Ongoing) + txnManager.appendTransactionToLog(idAndMetadata.transactionalId, idAndMetadata.metadata, (errors: Errors) => { + if (errors != Errors.NONE) + warn(s"failed to append transactionalId ${idAndMetadata.transactionalId} to log during transaction expiry. errors:$errors") + else + handleEndTransaction(idAndMetadata.transactionalId, + idAndMetadata.metadata.pid, + idAndMetadata.metadata.producerEpoch, + TransactionResult.ABORT, + (errors: Errors) => { + if (errors != Errors.NONE) + warn(s"rollback of transactionalId: ${idAndMetadata.transactionalId} failed during transaction expiry. errors: $errors") + } + ) + }) + } + } + } + } + /** * Startup logic executed at the same time when the server starts up. */ def startup(enablePidExpiration: Boolean = true) { info("Starting up.") + scheduler.startup() + scheduler.schedule("transaction-expiration", + expireTransactions, + TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs, + TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs + ) if (enablePidExpiration) txnManager.enablePidExpiration() txnMarkerChannelManager.start() @@ -403,10 +439,11 @@ class TransactionCoordinator(brokerId: Int, def shutdown() { info("Shutting down.") isActive.set(false) + scheduler.shutdown() + txnMarkerPurgatory.shutdown() pidManager.shutdown() txnManager.shutdown() txnMarkerChannelManager.shutdown() - txnMarkerPurgatory.shutdown() info("Shutdown complete.") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala index cad3ea5..e60bd40 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala @@ -166,9 +166,10 @@ class TransactionMarkerChannel(interBrokerListenerName: ListenerName, pendingTxnMap.get(PendingTxnKey(metadataPartition, pid)) } - def clear(): Unit = { + def close(): Unit = { brokerStateMap.clear() pendingTxnMap.clear() + networkClient.close() } def removeStateForPartition(partition: Int): mutable.Iterable[Long] = { http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index 2c17564..1b7ea56 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -107,7 +107,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig, def shutdown(): Unit = { interBrokerSendThread.shutdown() - transactionMarkerChannel.clear() + transactionMarkerChannel.close() } http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala index d84e054..a81e47b 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala @@ -161,8 +161,8 @@ private[coordinator] class TransactionMetadata(val pid: Long, txnTimeoutMs == other.txnTimeoutMs && state.equals(other.state) && topicPartitions.equals(other.topicPartitions) && - transactionStartTime.equals(other.transactionStartTime) && - lastUpdateTimestamp.equals(other.lastUpdateTimestamp) + transactionStartTime == other.transactionStartTime && + lastUpdateTimestamp == other.lastUpdateTimestamp case _ => false } http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index e23324f..f5dc3c0 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -43,6 +43,7 @@ object TransactionManager { // default transaction management config values val DefaultTransactionalIdExpirationMs = TimeUnit.DAYS.toMillis(7).toInt val DefaultTransactionsMaxTimeoutMs = TimeUnit.MINUTES.toMillis(15).toInt + val DefaultRemoveExpiredTransactionsIntervalMs = TimeUnit.MINUTES.toMillis(1).toInt } /** @@ -82,9 +83,9 @@ class TransactionStateManager(brokerId: Int, private val transactionTopicPartitionCount = getTransactionTopicPartitionCount def enablePidExpiration() { - scheduler.startup() - - // TODO: add transaction and pid expiration logic + if (!scheduler.isStarted) + scheduler.startup() + // TODO: add pid expiration logic } /** @@ -142,6 +143,19 @@ class TransactionStateManager(brokerId: Int, loadingPartitions.contains(partitionId) } + + def transactionsToExpire(): Iterable[TransactionalIdAndMetadata] = { + val now = time.milliseconds() + transactionMetadataCache.filter { case (_, metadata) => + metadata.state match { + case Ongoing => + metadata.transactionStartTime + metadata.txnTimeoutMs < now + case _ => false + } + }.map {case (id, metadata) => + TransactionalIdAndMetadata(id, metadata) + } + } /** * Gets the partition count of the transaction log topic from ZooKeeper. * If the topic does not exist, the default partition count is returned. @@ -445,4 +459,7 @@ private[transaction] case class TransactionConfig(transactionalIdExpirationMs: I transactionLogReplicationFactor: Short = TransactionLog.DefaultReplicationFactor, transactionLogSegmentBytes: Int = TransactionLog.DefaultSegmentBytes, transactionLogLoadBufferSize: Int = TransactionLog.DefaultLoadBufferSize, - transactionLogMinInsyncReplicas: Int = TransactionLog.DefaultMinInSyncReplicas) + transactionLogMinInsyncReplicas: Int = TransactionLog.DefaultMinInSyncReplicas, + removeExpiredTransactionsIntervalMs: Int = TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs) + +case class TransactionalIdAndMetadata(transactionalId: String, metadata: TransactionMetadata) http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 708201a..76f6380 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -165,6 +165,7 @@ object Defaults { val TransactionsTopicReplicationFactor = TransactionLog.DefaultReplicationFactor val TransactionsTopicPartitions = TransactionLog.DefaultNumPartitions val TransactionsTopicSegmentBytes = TransactionLog.DefaultSegmentBytes + val TransactionsExpiredTransactionCleanupIntervalMS = TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs /** ********* Quota Configuration ***********/ val ProducerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault @@ -349,6 +350,8 @@ object KafkaConfig { val TransactionsTopicPartitionsProp = "transaction.state.log.num.partitions" val TransactionsTopicSegmentBytesProp = "transaction.state.log.segment.bytes" val TransactionsTopicReplicationFactorProp = "transaction.state.log.replication.factor" + val TransactionsExpiredTransactionCleanupIntervalMsProp = "transaction.expired.transaction.cleanup.interval.ms" + /** ********* Quota Configuration ***********/ val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default" val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default" @@ -594,6 +597,7 @@ object KafkaConfig { "Internal topic creation will fail until the cluster size meets this replication factor requirement." val TransactionsTopicPartitionsDoc = "The number of partitions for the transaction topic (should not change after deployment)." val TransactionsTopicSegmentBytesDoc = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads" + val TransactionsExpiredTransactionCleanupIntervalMsDoc = "The interval at which to rollback expired transactions" /** ********* Quota Configuration ***********/ val ProducerQuotaBytesPerSecondDefaultDoc = "DEPRECATED: Used only when dynamic default quotas are not configured for <user>, <client-id> or <user, client-id> in Zookeeper. " + @@ -799,6 +803,7 @@ object KafkaConfig { .define(TransactionsTopicReplicationFactorProp, SHORT, Defaults.TransactionsTopicReplicationFactor, atLeast(1), HIGH, TransactionsTopicReplicationFactorDoc) .define(TransactionsTopicPartitionsProp, INT, Defaults.TransactionsTopicPartitions, atLeast(1), HIGH, TransactionsTopicPartitionsDoc) .define(TransactionsTopicSegmentBytesProp, INT, Defaults.TransactionsTopicSegmentBytes, atLeast(1), HIGH, TransactionsTopicSegmentBytesDoc) + .define(TransactionsExpiredTransactionCleanupIntervalMsProp, INT, Defaults.TransactionsExpiredTransactionCleanupIntervalMS, atLeast(1), LOW, TransactionsExpiredTransactionCleanupIntervalMsDoc) /** ********* Kafka Metrics Configuration ***********/ .define(MetricNumSamplesProp, INT, Defaults.MetricNumSamples, atLeast(1), LOW, MetricNumSamplesDoc) @@ -1008,6 +1013,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra val transactionTopicReplicationFactor = getShort(KafkaConfig.TransactionsTopicReplicationFactorProp) val transactionTopicPartitions = getInt(KafkaConfig.TransactionsTopicPartitionsProp) val transactionTopicSegmentBytes = getInt(KafkaConfig.TransactionsTopicSegmentBytesProp) + val transactionTransactionsExpiredTransactionCleanupIntervalMs = getInt(KafkaConfig.TransactionsExpiredTransactionCleanupIntervalMsProp) /** ********* Metric Configuration **************/ val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp) http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala index cf773bb..a9f1bca 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala @@ -17,6 +17,7 @@ package kafka.coordinator.transaction import kafka.server.DelayedOperationPurgatory +import kafka.utils.MockScheduler import kafka.utils.timer.MockTimer import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors @@ -46,12 +47,14 @@ class TransactionCoordinatorTest { private val txnMarkerPurgatory = new DelayedOperationPurgatory[DelayedTxnMarker]("test", new MockTimer, reaperEnabled = false) private val partitions = mutable.Set[TopicPartition](new TopicPartition("topic1", 0)) + private val scheduler = new MockScheduler(time) val coordinator: TransactionCoordinator = new TransactionCoordinator(brokerId, pidManager, transactionManager, transactionMarkerChannelManager, txnMarkerPurgatory, + scheduler, time) var result: InitPidResult = _ @@ -613,6 +616,69 @@ class TransactionCoordinatorTest { EasyMock.verify(transactionManager) } + @Test + def shouldAbortExpiredTransactionsInOngoingState(): Unit = { + EasyMock.expect(transactionManager.transactionsToExpire()) + .andReturn(List(TransactionalIdAndMetadata(transactionalId, + new TransactionMetadata(pid, epoch, 0, Ongoing, partitions, time.milliseconds(), time.milliseconds())))) + + // should bump the epoch and append to the log + val metadata = new TransactionMetadata(pid, (epoch + 1).toShort, 0, Ongoing, partitions, time.milliseconds(), time.milliseconds()) + EasyMock.expect(transactionManager.appendTransactionToLog(EasyMock.eq(transactionalId), + EasyMock.eq(metadata), + EasyMock.capture(capturedErrorsCallback))) + .andAnswer(new IAnswer[Unit] { + override def answer(): Unit = { + capturedErrorsCallback.getValue.apply(Errors.NONE) + } + }).once() + + EasyMock.expect(transactionManager.isCoordinatorFor(transactionalId)) + .andReturn(true) + EasyMock.expect(transactionManager.getTransactionState(transactionalId)) + .andReturn(Some(metadata)) + .once() + + // now should perform the rollback and append the state as PrepareAbort + val abortMetadata = metadata.copy() + abortMetadata.state = PrepareAbort + // need to allow for the time.sleep below + abortMetadata.lastUpdateTimestamp = time.milliseconds() + TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs + + EasyMock.expect(transactionManager.appendTransactionToLog(EasyMock.eq(transactionalId), + EasyMock.eq(abortMetadata), + EasyMock.capture(capturedErrorsCallback))) + .andAnswer(new IAnswer[Unit] { + override def answer(): Unit = {} + }) + .once() + + EasyMock.replay(transactionManager, transactionMarkerChannelManager) + + coordinator.startup(false) + time.sleep(TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs) + scheduler.tick() + EasyMock.verify(transactionManager) + } + + @Test + def shouldNotAbortExpiredTransactionsThatHaveAPendingStateTransition(): Unit = { + val metadata = new TransactionMetadata(pid, epoch, 0, Ongoing, partitions, time.milliseconds(), time.milliseconds()) + metadata.prepareTransitionTo(PrepareCommit) + + EasyMock.expect(transactionManager.transactionsToExpire()) + .andReturn(List(TransactionalIdAndMetadata(transactionalId, + metadata))) + + EasyMock.replay(transactionManager, transactionMarkerChannelManager) + coordinator.startup(false) + + time.sleep(TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs) + scheduler.tick() + EasyMock.verify(transactionManager) + + } + private def validateRespondsWithConcurrentTransactionsOnInitPidWhenInPrepareState(state: TransactionState) = { val transactionId = "tid" EasyMock.expect(transactionManager.isCoordinatorFor(transactionId)) http://git-wip-us.apache.org/repos/asf/kafka/blob/49518491/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 09a89dd..2a14898 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -305,6 +305,38 @@ class TransactionStateManagerTest { } @Test + def shouldOnlyConsiderTransactionsInTheOngoingStateForExpiry(): Unit = { + txnMetadata1.state = Ongoing + txnMetadata1.transactionStartTime = time.milliseconds() + transactionManager.addTransaction(txnId1, txnMetadata1) + transactionManager.addTransaction(txnId2, txnMetadata2) + + val ongoingButNotExpiring = txnMetadata1.copy() + ongoingButNotExpiring.txnTimeoutMs = 10000 + transactionManager.addTransaction("not-expiring", ongoingButNotExpiring) + + val prepareCommit = txnMetadata1.copy() + prepareCommit.state = PrepareCommit + transactionManager.addTransaction("pc", prepareCommit) + + val prepareAbort = txnMetadata1.copy() + prepareAbort.state = PrepareAbort + transactionManager.addTransaction("pa", prepareAbort) + + val committed = txnMetadata1.copy() + committed.state = CompleteCommit + transactionManager.addTransaction("cc", committed) + + val aborted = txnMetadata1.copy() + aborted.state = CompleteAbort + transactionManager.addTransaction("ca", aborted) + + time.sleep(2000) + val expiring = transactionManager.transactionsToExpire() + assertEquals(List(TransactionalIdAndMetadata(txnId1, txnMetadata1)), expiring) + } + + @Test def shouldWriteTxnMarkersForTransactionInPreparedCommitState(): Unit = { verifyWritesTxnMarkersInPrepareState(PrepareCommit) }
