Repository: incubator-streams Updated Branches: refs/heads/master 536f85d34 -> 4f6f7d5f5
STREAMS-252 | Fixed monitor executor to allow for a BroadcastMonitorThread for each stream component Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9b9200e0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9b9200e0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9b9200e0 Branch: refs/heads/master Commit: 9b9200e063f87c04d9079593f4a348f8d29be92a Parents: 11adec3 Author: Robert Douglas <[email protected]> Authored: Tue Dec 9 13:14:56 2014 -0600 Committer: Robert Douglas <[email protected]> Committed: Tue Dec 9 13:14:56 2014 -0600 ---------------------------------------------------------------------- .../apache/streams/local/builders/LocalStreamBuilder.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9b9200e0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java index 5cf8f64..daef084 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java @@ -203,14 +203,16 @@ public class LocalStreamBuilder implements StreamBuilder { attachShutdownHandler(); boolean isRunning = true; this.executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this); - this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1); + this.monitor = Executors.newCachedThreadPool(); Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>(); tasks = new HashMap<String, List<StreamsTask>>(); boolean forcedShutDown = false; try { - monitorThread = new LocalStreamProcessMonitorThread(executor, 10); - this.monitor.submit(monitorThread); + if(this.useDeprecatedMonitors) { + monitorThread = new LocalStreamProcessMonitorThread(executor, 10); + this.monitor.submit(monitorThread); + } setupComponentTasks(tasks); setupProviderTasks(provTasks); LOGGER.info("Started stream with {} components", tasks.size()); @@ -320,10 +322,10 @@ public class LocalStreamBuilder implements StreamBuilder { this.futures.put(task, this.executor.submit(task)); compTasks.add(task); if(this.useDeprecatedMonitors && comp.isOperationCountable() ) { - this.monitor.submit(broadcastMonitor); this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10)); this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); } + this.monitor.submit(broadcastMonitor); } streamsTasks.put(comp.getId(), compTasks); }
