Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2203#discussion_r155642325
  
    --- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---
    @@ -418,12 +430,19 @@ public DisruptorQueue(String queueName, ProducerType 
type, int size, long readTi
             _barrier = _buffer.newBarrier();
             _buffer.addGatingSequences(_consumer);
             _metrics = new QueueMetrics();
    +        _disruptorMetrics = 
StormMetricRegistry.disruptorMetrics(_queueName, topologyId, componentId, port);
             //The batch size can be no larger than half the full queue size.
             //This is mostly to avoid contention issues.
             _inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2));
     
             _flusher = new Flusher(Math.max(flushInterval, 1), _queueName);
             _flusher.start();
    +        METRICS_TIMER.schedule(new TimerTask(){
    --- End diff --
    
    Won't this leak the task if the queue is shut down?


---

Reply via email to