Repository: kafka Updated Branches: refs/heads/trunk bcf447e93 -> 3085d4f43
KAFKA-5182; Close txn coordinator threads during broker shutdown Shutdown delayed delete purgatory thread, transaction marker purgatory thread and send thread in `TransactionMarkerChannelManager` during broker shutdown. Made `InterBrokerSendThread` interruptible so that it is shutdown. Author: Rajini Sivaram <[email protected]> Reviewers: Guozhang Wang <[email protected]>, Ismael Juma <[email protected]> Closes #3014 from rajinisivaram/KAFKA-5182 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3085d4f4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3085d4f4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3085d4f4 Branch: refs/heads/trunk Commit: 3085d4f435bba74fc9aa077e23a4da3475b4d2ec Parents: bcf447e Author: Rajini Sivaram <[email protected]> Authored: Thu May 11 13:16:57 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Thu May 11 13:19:50 2017 +0100 ---------------------------------------------------------------------- .../kafka/common/InterBrokerSendThread.scala | 5 +++-- .../transaction/TransactionCoordinator.scala | 1 + .../scala/kafka/server/DelayedOperation.scala | 2 +- .../src/main/scala/kafka/server/KafkaServer.scala | 18 ++++++++++-------- .../main/scala/kafka/server/ReplicaManager.scala | 1 + 5 files changed, 16 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3085d4f4/core/src/main/scala/kafka/common/InterBrokerSendThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala index b626954..217aa80 100644 --- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala +++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala @@ -29,8 +29,9 @@ import org.apache.kafka.common.utils.Time class InterBrokerSendThread(name: String, networkClient: NetworkClient, requestGenerator: () => Iterable[RequestAndCompletionHandler], - time: Time) - extends ShutdownableThread(name, isInterruptible = false) { + time: Time, + isInterruptible: Boolean = true) + extends ShutdownableThread(name, isInterruptible) { override def doWork() { val now = time.milliseconds() http://git-wip-us.apache.org/repos/asf/kafka/blob/3085d4f4/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 46c061e..38e725f 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -406,6 +406,7 @@ class TransactionCoordinator(brokerId: Int, pidManager.shutdown() txnManager.shutdown() txnMarkerChannelManager.shutdown() + txnMarkerPurgatory.shutdown() info("Shutdown complete.") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3085d4f4/core/src/main/scala/kafka/server/DelayedOperation.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 7436904..c0efc53 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -393,7 +393,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri * A background reaper to expire delayed operations that have timed out */ private class ExpiredOperationReaper extends ShutdownableThread( - "ExpirationReaper-%d".format(brokerId), + "ExpirationReaper-%d-%s".format(brokerId, purgatoryName), false) { override def doWork() { http://git-wip-us.apache.org/repos/asf/kafka/blob/3085d4f4/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 3c681e4..431d192 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -589,25 +589,27 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) { CoreUtils.swallow(controlledShutdown()) brokerState.newState(BrokerShuttingDown) - if(socketServer != null) + if (socketServer != null) CoreUtils.swallow(socketServer.shutdown()) - if(requestHandlerPool != null) + if (requestHandlerPool != null) CoreUtils.swallow(requestHandlerPool.shutdown()) CoreUtils.swallow(kafkaScheduler.shutdown()) - if(apis != null) + if (apis != null) CoreUtils.swallow(apis.close()) CoreUtils.swallow(authorizer.foreach(_.close())) - if(replicaManager != null) + if (replicaManager != null) CoreUtils.swallow(replicaManager.shutdown()) if (adminManager != null) CoreUtils.swallow(adminManager.shutdown()) - if(groupCoordinator != null) + if (transactionCoordinator != null) + CoreUtils.swallow(transactionCoordinator.shutdown()) + if (groupCoordinator != null) CoreUtils.swallow(groupCoordinator.shutdown()) - if(logManager != null) + if (logManager != null) CoreUtils.swallow(logManager.shutdown()) - if(kafkaController != null) + if (kafkaController != null) CoreUtils.swallow(kafkaController.shutdown()) - if(zkUtils != null) + if (zkUtils != null) CoreUtils.swallow(zkUtils.close()) if (metrics != null) CoreUtils.swallow(metrics.close()) http://git-wip-us.apache.org/repos/asf/kafka/blob/3085d4f4/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 663ab1e..99c1b45 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1096,6 +1096,7 @@ class ReplicaManager(val config: KafkaConfig, replicaFetcherManager.shutdown() delayedFetchPurgatory.shutdown() delayedProducePurgatory.shutdown() + delayedDeleteRecordsPurgatory.shutdown() if (checkpointHW) checkpointHighWatermarks() info("Shut down completely")
