[
https://issues.apache.org/jira/browse/STORM-1190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14998941#comment-14998941
]
ASF GitHub Bot commented on STORM-1190:
---------------------------------------
Github user danielschonfeld commented on the pull request:
https://github.com/apache/storm/pull/870#issuecomment-155494823
@revans2 this is what I had in mind. We call start() on this in worker.clj
and we pass this `FlushingService` to `DisruptorQueue` so each queue could
register a callable to flush the messages.
It uses a cached thread pool which will increase according to demand or
otherwise just re-use threads already created.
```
package backtype.storm.utils;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.*;
public class FlushingService implements Runnable {
private static final ScheduledThreadPoolExecutor TIMER = new
ScheduledThreadPoolExecutor(0);
private static final ExecutorService es =
Executors.newCachedThreadPool();
private final Collection<Callable> registeredRunnables =
Lists.newArrayList();
private ScheduledFuture<?> _future;
private final long _flushInterval;
public FlushingService(long flushInterval) {
_flushInterval = flushInterval;
}
public void registerCallable(Callable c) {
registeredRunnables.add(c);
}
public void run() {
try {
es.invokeAll(registeredRunnables);
} catch (InterruptedException e) {
//do something?
}
}
public void start() {
_future = TIMER.scheduleWithFixedDelay(this, _flushInterval,
_flushInterval, TimeUnit.MILLISECONDS);
synchronized(TIMER) {
TIMER.setCorePoolSize(TIMER.getCorePoolSize() + 1);
}
}
public void close() {
if (_future != null) {
_future.cancel(true);
_future = null;
synchronized(TIMER) {
TIMER.setCorePoolSize(TIMER.getCorePoolSize() - 1);
}
}
}
}
```
> System load spikes in recent snapshot
> -------------------------------------
>
> Key: STORM-1190
> URL: https://issues.apache.org/jira/browse/STORM-1190
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-core
> Affects Versions: 0.11.0
> Environment: 10x (CoreOS stable (766.4.0) / k8s 1.0.1 / docker
> running on Azure VMs)
> Reporter: Michael Schonfeld
> Priority: Critical
> Attachments: Screenshot 2015-11-08 22.17.57.png, Screenshot
> 2015-11-08 22.18.06.png
>
>
> We've been running Storm's snapshots on our production cluster for a little
> while now (that back pressure support really helped us), and we've noticed a
> sudden spike in system load when going from
> commit@ba1250993d10ffc523c9f5464371fbeb406d216f to the current latest
> commit@c12e28c829fcfabc0a3a775fb9714968b7e3e349. Both versions were running
> the exact same topologies, and there was no significant change in workload.
> Not exactly sure how to even begin to debug this, so we ended up just rolling
> back. Thoughts?
> Stats screenshots attached
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)