[ 
https://issues.apache.org/jira/browse/STORM-1190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14998941#comment-14998941
 ] 

ASF GitHub Bot commented on STORM-1190:
---------------------------------------

Github user danielschonfeld commented on the pull request:

    https://github.com/apache/storm/pull/870#issuecomment-155494823
  
    @revans2 this is what I had in mind.  We call start() on this in worker.clj 
and we pass this `FlushingService` to `DisruptorQueue` so each queue could 
register a callable to flush the messages.
    
    It uses a cached thread pool which will increase according to demand or 
otherwise just re-use threads already created.
    
    ```
    package backtype.storm.utils;
    
    
    import com.google.common.collect.Lists;
    
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.concurrent.*;
    
    public class FlushingService implements Runnable {
        private static final ScheduledThreadPoolExecutor TIMER = new 
ScheduledThreadPoolExecutor(0);
        private static final ExecutorService es = 
Executors.newCachedThreadPool();
    
        private final Collection<Callable> registeredRunnables = 
Lists.newArrayList();
    
        private ScheduledFuture<?> _future;
        private final long _flushInterval;
    
        public FlushingService(long flushInterval) {
            _flushInterval = flushInterval;
        }
    
        public void registerCallable(Callable c) {
            registeredRunnables.add(c);
        }
        
        public void run() {
            try {
                es.invokeAll(registeredRunnables);
            } catch (InterruptedException e) {
                //do something?
            }
        }
    
        public void start() {
            _future = TIMER.scheduleWithFixedDelay(this, _flushInterval, 
_flushInterval, TimeUnit.MILLISECONDS);
            synchronized(TIMER) {
                TIMER.setCorePoolSize(TIMER.getCorePoolSize() + 1);
            }
        }
    
        public void close() {
            if (_future != null) {
                _future.cancel(true);
                _future = null;
                synchronized(TIMER) {
                    TIMER.setCorePoolSize(TIMER.getCorePoolSize() - 1);
                }
            }
        }
    }
    
    ```


> System load spikes in recent snapshot
> -------------------------------------
>
>                 Key: STORM-1190
>                 URL: https://issues.apache.org/jira/browse/STORM-1190
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-core
>    Affects Versions: 0.11.0
>         Environment: 10x (CoreOS stable (766.4.0) / k8s 1.0.1 / docker 
> running on Azure VMs)
>            Reporter: Michael Schonfeld
>            Priority: Critical
>         Attachments: Screenshot 2015-11-08 22.17.57.png, Screenshot 
> 2015-11-08 22.18.06.png
>
>
> We've been running Storm's snapshots on our production cluster for a little 
> while now (that back pressure support really helped us), and we've noticed a 
> sudden spike in system load when going from 
> commit@ba1250993d10ffc523c9f5464371fbeb406d216f to the current latest 
> commit@c12e28c829fcfabc0a3a775fb9714968b7e3e349. Both versions were running 
> the exact same topologies, and there was no significant change in workload. 
> Not exactly sure how to even begin to debug this, so we ended up just rolling 
> back. Thoughts?
> Stats screenshots attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to