STREAMS-252 | Fix issue that was causing some unit tests to fail

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/af7aabf4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/af7aabf4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/af7aabf4

Branch: refs/heads/master
Commit: af7aabf45c4f938db305f311edda6962bd5140f7
Parents: c466a2c
Author: Robert Douglas <[email protected]>
Authored: Thu Dec 11 16:54:55 2014 -0600
Committer: Robert Douglas <[email protected]>
Committed: Thu Dec 11 16:54:55 2014 -0600

----------------------------------------------------------------------
 .../local/builders/LocalStreamBuilder.java      | 35 +++++++++++---------
 1 file changed, 19 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/af7aabf4/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index daef084..9775727 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -22,10 +22,7 @@ import org.apache.streams.core.*;
 import org.apache.streams.local.counters.StreamsTaskCounter;
 import 
org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor;
 import org.apache.streams.local.queues.ThroughputQueue;
-import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread;
-import org.apache.streams.local.tasks.StatusCounterMonitorThread;
-import org.apache.streams.local.tasks.StreamsProviderTask;
-import org.apache.streams.local.tasks.StreamsTask;
+import org.apache.streams.local.tasks.*;
 import org.apache.streams.monitoring.tasks.BroadcastMonitorThread;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -203,16 +200,14 @@ public class LocalStreamBuilder implements StreamBuilder {
         attachShutdownHandler();
         boolean isRunning = true;
         this.executor = new 
ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this);
-        this.monitor = Executors.newCachedThreadPool();
+        this.monitor = Executors.newFixedThreadPool(this.monitorTasks + 1);
         Map<String, StreamsProviderTask> provTasks = new HashMap<String, 
StreamsProviderTask>();
         tasks = new HashMap<String, List<StreamsTask>>();
         boolean forcedShutDown = false;
 
         try {
-            if(this.useDeprecatedMonitors) {
-                monitorThread = new LocalStreamProcessMonitorThread(executor, 
10);
-                this.monitor.submit(monitorThread);
-            }
+            monitorThread = new LocalStreamProcessMonitorThread(executor, 10);
+            this.monitor.submit(monitorThread);
             setupComponentTasks(tasks);
             setupProviderTasks(provTasks);
             LOGGER.info("Started stream with {} components", tasks.size());
@@ -222,7 +217,13 @@ public class LocalStreamBuilder implements StreamBuilder {
                     isRunning = isRunning || task.isRunning();
                 }
                 for(StreamComponent task: components.values()) {
-                    isRunning = isRunning || task.getInBoundQueue().size() > 0;
+                    boolean tasksRunning = false;
+                    for(StreamsTask t : task.getStreamsTasks()) {
+                        if(t instanceof BaseStreamsTask) {
+                            tasksRunning = tasksRunning || ((BaseStreamsTask) 
t).isRunning();
+                        }
+                    }
+                    isRunning = isRunning || (tasksRunning && 
task.getInBoundQueue().size() > 0);
                 }
                 if(isRunning) {
                     Thread.sleep(3000);
@@ -321,11 +322,13 @@ public class LocalStreamBuilder implements StreamBuilder {
                 task.setStreamConfig(this.streamConfig);
                 this.futures.put(task, this.executor.submit(task));
                 compTasks.add(task);
-                if(this.useDeprecatedMonitors &&  comp.isOperationCountable() 
) {
-                    this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10));
-                    this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) task, 10));
+                if(comp.isOperationCountable() ) {
+                    if(this.useDeprecatedMonitors) {
+                        this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10));
+                        this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) task, 10));
+                    }
+                    this.monitor.submit(broadcastMonitor);
                 }
-                this.monitor.submit(broadcastMonitor);
             }
             streamsTasks.put(comp.getId(), compTasks);
         }
@@ -426,7 +429,7 @@ public class LocalStreamBuilder implements StreamBuilder {
     }
 
     protected int getTimeout() {
-    //Set the timeout of it is configured, otherwise signal downstream 
components to use their default
+        //Set the timeout of it is configured, otherwise signal downstream 
components to use their default
         return streamConfig != null && streamConfig.containsKey(TIMEOUT_KEY) ? 
(Integer)streamConfig.get(TIMEOUT_KEY) : -1;
     }
 
@@ -441,4 +444,4 @@ public class LocalStreamBuilder implements StreamBuilder {
         }
     }
 
-}
+}
\ No newline at end of file

Reply via email to