Repository: incubator-streams
Updated Branches:
  refs/heads/master 536f85d34 -> 4f6f7d5f5


STREAMS-252 | Fixed monitor executor to allow for a BroadcastMonitorThread for 
each stream component


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9b9200e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9b9200e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9b9200e0

Branch: refs/heads/master
Commit: 9b9200e063f87c04d9079593f4a348f8d29be92a
Parents: 11adec3
Author: Robert Douglas <[email protected]>
Authored: Tue Dec 9 13:14:56 2014 -0600
Committer: Robert Douglas <[email protected]>
Committed: Tue Dec 9 13:14:56 2014 -0600

----------------------------------------------------------------------
 .../apache/streams/local/builders/LocalStreamBuilder.java | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9b9200e0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 5cf8f64..daef084 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -203,14 +203,16 @@ public class LocalStreamBuilder implements StreamBuilder {
         attachShutdownHandler();
         boolean isRunning = true;
         this.executor = new 
ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this);
-        this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1);
+        this.monitor = Executors.newCachedThreadPool();
         Map<String, StreamsProviderTask> provTasks = new HashMap<String, 
StreamsProviderTask>();
         tasks = new HashMap<String, List<StreamsTask>>();
         boolean forcedShutDown = false;
 
         try {
-            monitorThread = new LocalStreamProcessMonitorThread(executor, 10);
-            this.monitor.submit(monitorThread);
+            if(this.useDeprecatedMonitors) {
+                monitorThread = new LocalStreamProcessMonitorThread(executor, 
10);
+                this.monitor.submit(monitorThread);
+            }
             setupComponentTasks(tasks);
             setupProviderTasks(provTasks);
             LOGGER.info("Started stream with {} components", tasks.size());
@@ -320,10 +322,10 @@ public class LocalStreamBuilder implements StreamBuilder {
                 this.futures.put(task, this.executor.submit(task));
                 compTasks.add(task);
                 if(this.useDeprecatedMonitors &&  comp.isOperationCountable() 
) {
-                    this.monitor.submit(broadcastMonitor);
                     this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10));
                     this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) task, 10));
                 }
+                this.monitor.submit(broadcastMonitor);
             }
             streamsTasks.put(comp.getId(), compTasks);
         }

Reply via email to