Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r155642325 --- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java --- @@ -418,12 +430,19 @@ public DisruptorQueue(String queueName, ProducerType type, int size, long readTi _barrier = _buffer.newBarrier(); _buffer.addGatingSequences(_consumer); _metrics = new QueueMetrics(); + _disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, topologyId, componentId, port); //The batch size can be no larger than half the full queue size. //This is mostly to avoid contention issues. _inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2)); _flusher = new Flusher(Math.max(flushInterval, 1), _queueName); _flusher.start(); + METRICS_TIMER.schedule(new TimerTask(){ --- End diff -- Won't this leak the task if the queue is shut down?
---