STREAMS-252 | Fixing NPE issue causing Stream to drop datum in certain 
circumstances


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4f6f7d5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4f6f7d5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4f6f7d5f

Branch: refs/heads/master
Commit: 4f6f7d5f5ac277289deff43409bb27a99741f0cc
Parents: af7aabf
Author: Robert Douglas <[email protected]>
Authored: Thu Dec 11 17:28:55 2014 -0600
Committer: Robert Douglas <[email protected]>
Committed: Thu Dec 11 17:28:55 2014 -0600

----------------------------------------------------------------------
 .../local/builders/LocalStreamBuilder.java      | 22 +++++++++++---------
 1 file changed, 12 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4f6f7d5f/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 9775727..bef5ed7 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -200,14 +200,16 @@ public class LocalStreamBuilder implements StreamBuilder {
         attachShutdownHandler();
         boolean isRunning = true;
         this.executor = new 
ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this);
-        this.monitor = Executors.newFixedThreadPool(this.monitorTasks + 1);
+        this.monitor = Executors.newCachedThreadPool();
         Map<String, StreamsProviderTask> provTasks = new HashMap<String, 
StreamsProviderTask>();
         tasks = new HashMap<String, List<StreamsTask>>();
         boolean forcedShutDown = false;
 
         try {
-            monitorThread = new LocalStreamProcessMonitorThread(executor, 10);
-            this.monitor.submit(monitorThread);
+            if (this.useDeprecatedMonitors) {
+                monitorThread = new LocalStreamProcessMonitorThread(executor, 
10);
+                this.monitor.submit(monitorThread);
+            }
             setupComponentTasks(tasks);
             setupProviderTasks(provTasks);
             LOGGER.info("Started stream with {} components", tasks.size());
@@ -279,7 +281,9 @@ public class LocalStreamBuilder implements StreamBuilder {
 
     protected void shutdown(Map<String, List<StreamsTask>> streamsTasks) 
throws InterruptedException {
         LOGGER.info("Attempting to shutdown tasks");
-        this.monitorThread.shutdown();
+        if (this.monitorThread != null) {
+            this.monitorThread.shutdown();
+        }
         this.executor.shutdown();
         //complete stream shut down gracfully
         for(StreamComponent prov : this.providers.values()) {
@@ -322,13 +326,11 @@ public class LocalStreamBuilder implements StreamBuilder {
                 task.setStreamConfig(this.streamConfig);
                 this.futures.put(task, this.executor.submit(task));
                 compTasks.add(task);
-                if(comp.isOperationCountable() ) {
-                    if(this.useDeprecatedMonitors) {
-                        this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10));
-                        this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) task, 10));
-                    }
-                    this.monitor.submit(broadcastMonitor);
+                if(this.useDeprecatedMonitors &&  comp.isOperationCountable() 
) {
+                    this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10));
+                    this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) task, 10));
                 }
+                this.monitor.submit(broadcastMonitor);
             }
             streamsTasks.put(comp.getId(), compTasks);
         }

Reply via email to