Repository: kafka Updated Branches: refs/heads/trunk d04daf570 -> 3cfbb25c6
MINOR: Handle error metrics removal during shutdown Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #4187 from rajinisivaram/MINOR-metrics-cleanup Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3cfbb25c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3cfbb25c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3cfbb25c Branch: refs/heads/trunk Commit: 3cfbb25c616bee44ac181e9320d4fd6d79ab9c58 Parents: d04daf5 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Wed Nov 15 09:54:42 2017 +0000 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Wed Nov 15 09:54:42 2017 +0000 ---------------------------------------------------------------------- .../scala/kafka/network/RequestChannel.scala | 7 +++++- .../main/scala/kafka/network/SocketServer.scala | 24 ++++++++++++++++---- .../kafka/server/KafkaRequestHandler.scala | 1 - .../main/scala/kafka/server/KafkaServer.scala | 9 +++++++- .../unit/kafka/network/SocketServerTest.scala | 7 +++--- 5 files changed, 37 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index a4ec5e3..a50af45 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -315,10 +315,15 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe } } - def shutdown() { + def clear() { requestQueue.clear() } + def shutdown() { + clear() + metrics.close() + } + def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest) } http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index bea8f79..4366fea 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -72,6 +72,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time private[network] val acceptors = mutable.Map[EndPoint, Acceptor]() private var connectionQuotas: ConnectionQuotas = _ + private var stoppedProcessingRequests = false /** * Start the socket server @@ -132,13 +133,28 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time requestChannel.addResponseListener(id => processors(id).wakeup()) /** - * Shutdown the socket server - */ - def shutdown() = { - info("Shutting down") + * Stop processing requests and new connections. + */ + def stopProcessingRequests() = { + info("Stopping socket server request processors") this.synchronized { acceptors.values.foreach(_.shutdown) processors.foreach(_.shutdown) + requestChannel.clear() + stoppedProcessingRequests = true + } + info("Stopped socket server request processors") + } + + /** + * Shutdown the socket server. If still processing requests, shutdown + * acceptors and processors first. + */ + def shutdown() = { + info("Shutting down socket server") + this.synchronized { + if (!stoppedProcessingRequests) + stopProcessingRequests() requestChannel.shutdown() } info("Shutdown completed") http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/server/KafkaRequestHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index 3d8dbd9..a498781 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -105,7 +105,6 @@ class KafkaRequestHandlerPool(val brokerId: Int, handler.initiateShutdown() for (handler <- runnables) handler.awaitShutdown() - requestChannel.metrics.close() info("shut down completely") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index a0732fd..a13f5af 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -526,8 +526,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP if (dynamicConfigManager != null) CoreUtils.swallow(dynamicConfigManager.shutdown()) + // Stop socket server to stop accepting any more connections and requests. + // Socket server will be shutdown towards the end of the sequence. if (socketServer != null) - CoreUtils.swallow(socketServer.shutdown()) + CoreUtils.swallow(socketServer.stopProcessingRequests()) if (requestHandlerPool != null) CoreUtils.swallow(requestHandlerPool.shutdown()) @@ -558,6 +560,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP if (quotaManagers != null) CoreUtils.swallow(quotaManagers.shutdown()) + // Even though socket server is stopped much earlier, controller can generate + // response for controlled shutdown request. Shutdown server at the end to + // avoid any failures (e.g. when metrics are recorded) + if (socketServer != null) + CoreUtils.swallow(socketServer.shutdown()) if (metrics != null) CoreUtils.swallow(metrics.close()) if (brokerTopicStats != null) http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 0c05c46..024e7f9 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -136,7 +136,6 @@ class SocketServerTest extends JUnitSuite { def shutdownServerAndMetrics(server: SocketServer): Unit = { server.shutdown() server.metrics.close() - server.requestChannel.metrics.close() } @After @@ -576,8 +575,8 @@ class SocketServerTest extends JUnitSuite { } @Test - def testRequestMetricsAfterShutdown(): Unit = { - server.shutdown() + def testRequestMetricsAfterStop(): Unit = { + server.stopProcessingRequests() server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate.mark() server.requestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1)) @@ -591,7 +590,7 @@ class SocketServerTest extends JUnitSuite { .collect { case (k, metric: Meter) => (k.toString, metric.count) } assertEquals(nonZeroMeters, requestMetricMeters.filter { case (_, value) => value != 0 }) - server.requestChannel.metrics.close() + server.shutdown() assertEquals(Map.empty, requestMetricMeters) }