Github user danielschonfeld commented on the pull request:
https://github.com/apache/storm/pull/870#issuecomment-155494823
@revans2 this is what I had in mind. We call start() on this in worker.clj
and we pass this `FlushingService` to `DisruptorQueue` so each queue could
register a callable to flush the messages.
It uses a cached thread pool which will increase according to demand or
otherwise just re-use threads already created.
```
package backtype.storm.utils;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.*;
public class FlushingService implements Runnable {
private static final ScheduledThreadPoolExecutor TIMER = new
ScheduledThreadPoolExecutor(0);
private static final ExecutorService es =
Executors.newCachedThreadPool();
private final Collection<Callable> registeredRunnables =
Lists.newArrayList();
private ScheduledFuture<?> _future;
private final long _flushInterval;
public FlushingService(long flushInterval) {
_flushInterval = flushInterval;
}
public void registerCallable(Callable c) {
registeredRunnables.add(c);
}
public void run() {
try {
es.invokeAll(registeredRunnables);
} catch (InterruptedException e) {
//do something?
}
}
public void start() {
_future = TIMER.scheduleWithFixedDelay(this, _flushInterval,
_flushInterval, TimeUnit.MILLISECONDS);
synchronized(TIMER) {
TIMER.setCorePoolSize(TIMER.getCorePoolSize() + 1);
}
}
public void close() {
if (_future != null) {
_future.cancel(true);
_future = null;
synchronized(TIMER) {
TIMER.setCorePoolSize(TIMER.getCorePoolSize() - 1);
}
}
}
}
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---