STREAMS-252 | Fix issue that was causing some unit tests to fail
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/af7aabf4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/af7aabf4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/af7aabf4 Branch: refs/heads/master Commit: af7aabf45c4f938db305f311edda6962bd5140f7 Parents: c466a2c Author: Robert Douglas <[email protected]> Authored: Thu Dec 11 16:54:55 2014 -0600 Committer: Robert Douglas <[email protected]> Committed: Thu Dec 11 16:54:55 2014 -0600 ---------------------------------------------------------------------- .../local/builders/LocalStreamBuilder.java | 35 +++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/af7aabf4/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java index daef084..9775727 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java @@ -22,10 +22,7 @@ import org.apache.streams.core.*; import org.apache.streams.local.counters.StreamsTaskCounter; import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor; import org.apache.streams.local.queues.ThroughputQueue; -import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread; -import org.apache.streams.local.tasks.StatusCounterMonitorThread; -import org.apache.streams.local.tasks.StreamsProviderTask; -import org.apache.streams.local.tasks.StreamsTask; +import org.apache.streams.local.tasks.*; import org.apache.streams.monitoring.tasks.BroadcastMonitorThread; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -203,16 +200,14 @@ public class LocalStreamBuilder implements StreamBuilder { attachShutdownHandler(); boolean isRunning = true; this.executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this); - this.monitor = Executors.newCachedThreadPool(); + this.monitor = Executors.newFixedThreadPool(this.monitorTasks + 1); Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>(); tasks = new HashMap<String, List<StreamsTask>>(); boolean forcedShutDown = false; try { - if(this.useDeprecatedMonitors) { - monitorThread = new LocalStreamProcessMonitorThread(executor, 10); - this.monitor.submit(monitorThread); - } + monitorThread = new LocalStreamProcessMonitorThread(executor, 10); + this.monitor.submit(monitorThread); setupComponentTasks(tasks); setupProviderTasks(provTasks); LOGGER.info("Started stream with {} components", tasks.size()); @@ -222,7 +217,13 @@ public class LocalStreamBuilder implements StreamBuilder { isRunning = isRunning || task.isRunning(); } for(StreamComponent task: components.values()) { - isRunning = isRunning || task.getInBoundQueue().size() > 0; + boolean tasksRunning = false; + for(StreamsTask t : task.getStreamsTasks()) { + if(t instanceof BaseStreamsTask) { + tasksRunning = tasksRunning || ((BaseStreamsTask) t).isRunning(); + } + } + isRunning = isRunning || (tasksRunning && task.getInBoundQueue().size() > 0); } if(isRunning) { Thread.sleep(3000); @@ -321,11 +322,13 @@ public class LocalStreamBuilder implements StreamBuilder { task.setStreamConfig(this.streamConfig); this.futures.put(task, this.executor.submit(task)); compTasks.add(task); - if(this.useDeprecatedMonitors && comp.isOperationCountable() ) { - this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10)); - this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); + if(comp.isOperationCountable() ) { + if(this.useDeprecatedMonitors) { + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10)); + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); + } + this.monitor.submit(broadcastMonitor); } - this.monitor.submit(broadcastMonitor); } streamsTasks.put(comp.getId(), compTasks); } @@ -426,7 +429,7 @@ public class LocalStreamBuilder implements StreamBuilder { } protected int getTimeout() { - //Set the timeout of it is configured, otherwise signal downstream components to use their default + //Set the timeout of it is configured, otherwise signal downstream components to use their default return streamConfig != null && streamConfig.containsKey(TIMEOUT_KEY) ? (Integer)streamConfig.get(TIMEOUT_KEY) : -1; } @@ -441,4 +444,4 @@ public class LocalStreamBuilder implements StreamBuilder { } } -} +} \ No newline at end of file
