STREAMS-252 | Fixing NPE issue causing Stream to drop datum in certain circumstances
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4f6f7d5f Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4f6f7d5f Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4f6f7d5f Branch: refs/heads/master Commit: 4f6f7d5f5ac277289deff43409bb27a99741f0cc Parents: af7aabf Author: Robert Douglas <[email protected]> Authored: Thu Dec 11 17:28:55 2014 -0600 Committer: Robert Douglas <[email protected]> Committed: Thu Dec 11 17:28:55 2014 -0600 ---------------------------------------------------------------------- .../local/builders/LocalStreamBuilder.java | 22 +++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4f6f7d5f/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java index 9775727..bef5ed7 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java @@ -200,14 +200,16 @@ public class LocalStreamBuilder implements StreamBuilder { attachShutdownHandler(); boolean isRunning = true; this.executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this); - this.monitor = Executors.newFixedThreadPool(this.monitorTasks + 1); + this.monitor = Executors.newCachedThreadPool(); Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>(); tasks = new HashMap<String, List<StreamsTask>>(); boolean forcedShutDown = false; try { - monitorThread = new LocalStreamProcessMonitorThread(executor, 10); - this.monitor.submit(monitorThread); + if (this.useDeprecatedMonitors) { + monitorThread = new LocalStreamProcessMonitorThread(executor, 10); + this.monitor.submit(monitorThread); + } setupComponentTasks(tasks); setupProviderTasks(provTasks); LOGGER.info("Started stream with {} components", tasks.size()); @@ -279,7 +281,9 @@ public class LocalStreamBuilder implements StreamBuilder { protected void shutdown(Map<String, List<StreamsTask>> streamsTasks) throws InterruptedException { LOGGER.info("Attempting to shutdown tasks"); - this.monitorThread.shutdown(); + if (this.monitorThread != null) { + this.monitorThread.shutdown(); + } this.executor.shutdown(); //complete stream shut down gracfully for(StreamComponent prov : this.providers.values()) { @@ -322,13 +326,11 @@ public class LocalStreamBuilder implements StreamBuilder { task.setStreamConfig(this.streamConfig); this.futures.put(task, this.executor.submit(task)); compTasks.add(task); - if(comp.isOperationCountable() ) { - if(this.useDeprecatedMonitors) { - this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10)); - this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); - } - this.monitor.submit(broadcastMonitor); + if(this.useDeprecatedMonitors && comp.isOperationCountable() ) { + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10)); + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); } + this.monitor.submit(broadcastMonitor); } streamsTasks.put(comp.getId(), compTasks); }
