http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index f6fcaac..9e5089e 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -18,25 +18,38 @@
 
 package org.apache.streams.local.builders;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.*;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.counters.StreamsTaskCounter;
 import 
org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor;
 import org.apache.streams.local.monitoring.MonitoringConfiguration;
 import org.apache.streams.local.queues.ThroughputQueue;
-import org.apache.streams.local.tasks.*;
+import org.apache.streams.local.tasks.BaseStreamsTask;
+import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread;
+import org.apache.streams.local.tasks.StatusCounterMonitorThread;
+import org.apache.streams.local.tasks.StreamsProviderTask;
+import org.apache.streams.local.tasks.StreamsTask;
 import org.apache.streams.monitoring.tasks.BroadcastMonitorThread;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 
 import java.math.BigInteger;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -49,452 +62,452 @@ import java.util.concurrent.TimeUnit;
  */
 public class LocalStreamBuilder implements StreamBuilder {
 
-    private static final Logger LOGGER = 
org.slf4j.LoggerFactory.getLogger(LocalStreamBuilder.class);
-    private static final int DEFAULT_QUEUE_SIZE = 500;
-
-    public static final String TIMEOUT_KEY = "TIMEOUT";
-    public static final String BROADCAST_KEY = "broadcastURI";
-    public static final String STREAM_IDENTIFIER_KEY = "streamsID";
-    public static final String BROADCAST_INTERVAL_KEY = 
"monitoring_broadcast_interval_ms";
-    public static final String DEFAULT_STREAM_IDENTIFIER = "Unknown_Stream";
-    public static final String DEFAULT_STARTED_AT_KEY = "startedAt";
-
-    private Map<String, StreamComponent> providers;
-    private Map<String, StreamComponent> components;
-    private LocalRuntimeConfiguration streamConfig;
-    private Map<StreamsTask, Future> futures;
-    private ExecutorService executor;
-    private ExecutorService monitor;
-    private int totalTasks;
-    private int monitorTasks;
-    private LocalStreamProcessMonitorThread monitorThread;
-    private Map<String, List<StreamsTask>> tasks;
-    private Thread shutdownHook;
-    private BroadcastMonitorThread broadcastMonitor;
-    private int maxQueueCapacity;
-    private String streamIdentifier = DEFAULT_STREAM_IDENTIFIER;
-    private DateTime startedAt = new DateTime();
-    private boolean useDeprecatedMonitors;
-
-    /**
-     * Creates a local stream builder with all configuration resolved by 
typesafe
-     */
-    public LocalStreamBuilder() {
-        this(new 
ObjectMapper().convertValue(StreamsConfigurator.detectConfiguration(), 
LocalRuntimeConfiguration.class));
+  private static final Logger LOGGER = 
org.slf4j.LoggerFactory.getLogger(LocalStreamBuilder.class);
+  private static final int DEFAULT_QUEUE_SIZE = 500;
+
+  public static final String TIMEOUT_KEY = "TIMEOUT";
+  public static final String BROADCAST_KEY = "broadcastURI";
+  public static final String STREAM_IDENTIFIER_KEY = "streamsID";
+  public static final String BROADCAST_INTERVAL_KEY = 
"monitoring_broadcast_interval_ms";
+  public static final String DEFAULT_STREAM_IDENTIFIER = "Unknown_Stream";
+  public static final String DEFAULT_STARTED_AT_KEY = "startedAt";
+
+  private Map<String, StreamComponent> providers;
+  private Map<String, StreamComponent> components;
+  private LocalRuntimeConfiguration streamConfig;
+  private Map<StreamsTask, Future> futures;
+  private ExecutorService executor;
+  private ExecutorService monitor;
+  private int totalTasks;
+  private int monitorTasks;
+  private LocalStreamProcessMonitorThread monitorThread;
+  private Map<String, List<StreamsTask>> tasks;
+  private Thread shutdownHook;
+  private BroadcastMonitorThread broadcastMonitor;
+  private int maxQueueCapacity;
+  private String streamIdentifier = DEFAULT_STREAM_IDENTIFIER;
+  private DateTime startedAt = new DateTime();
+  private boolean useDeprecatedMonitors;
+
+  /**
+   * Creates a local stream builder with all configuration resolved by typesafe
+   */
+  public LocalStreamBuilder() {
+    this(new 
ObjectMapper().convertValue(StreamsConfigurator.detectConfiguration(), 
LocalRuntimeConfiguration.class));
+  }
+
+  /**
+   * Creates a local stream builder with a config object and default maximum 
internal queue size of 500
+   * @param streamConfig
+   * @deprecated use LocalRuntimeConfiguration constructor instead
+   */
+  @Deprecated
+  public LocalStreamBuilder(Map<String, Object> streamConfig) {
+    this(DEFAULT_QUEUE_SIZE, streamConfig);
+  }
+
+  /**
+   * Creates a local stream builder with no config object. If maxQueueCapacity 
is less than 1 the queue is
+   * unbounded.
+   * @param maxQueueCapacity
+   *
+   * @deprecated use LocalRuntimeConfiguration constructor instead
+   */
+  @Deprecated
+  public LocalStreamBuilder(int maxQueueCapacity) {
+    this(maxQueueCapacity, null);
+  }
+
+  /**
+   * Creates a local stream builder with a config object. If maxQueueCapacity 
is less than 1 the queue is
+   * unbounded.
+   *
+   * @param maxQueueCapacity
+   * @param streamConfig
+   *
+   * @deprecated use LocalRuntimeConfiguration constructor instead
+   */
+  @Deprecated
+  public LocalStreamBuilder(int maxQueueCapacity, Map<String, Object> 
streamConfig) {
+    this(new LocalRuntimeConfiguration());
+    this.streamConfig.setQueueSize(new Long(maxQueueCapacity));
+    if( streamConfig != null && 
streamConfig.get(LocalStreamBuilder.TIMEOUT_KEY) != null )
+      this.streamConfig.setProviderTimeoutMs(new Long((Integer) 
(streamConfig.get(LocalStreamBuilder.TIMEOUT_KEY))));
+    if( streamConfig != null && 
streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY) != null )
+      
this.streamConfig.setIdentifier((String)streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY));
+    if( streamConfig != null && 
streamConfig.get(LocalStreamBuilder.BROADCAST_KEY) != null ) {
+      MonitoringConfiguration monitoringConfiguration = new 
MonitoringConfiguration();
+      
monitoringConfiguration.setBroadcastURI((String)streamConfig.get(LocalStreamBuilder.BROADCAST_KEY));
+      if(streamConfig.get(LocalStreamBuilder.BROADCAST_INTERVAL_KEY) != null)
+        
monitoringConfiguration.setMonitoringBroadcastIntervalMs(Long.parseLong((String)streamConfig.get(LocalStreamBuilder.BROADCAST_INTERVAL_KEY)));
+      this.streamConfig.setMonitoring(monitoringConfiguration);
     }
-
-    /**
-     * Creates a local stream builder with a config object and default maximum 
internal queue size of 500
-     * @param streamConfig
-     * @deprecated use LocalRuntimeConfiguration constructor instead
-     */
-    @Deprecated
-    public LocalStreamBuilder(Map<String, Object> streamConfig) {
-        this(DEFAULT_QUEUE_SIZE, streamConfig);
-    }
-
-    /**
-     * Creates a local stream builder with no config object. If 
maxQueueCapacity is less than 1 the queue is
-     * unbounded.
-     * @param maxQueueCapacity
-     *
-     * @deprecated use LocalRuntimeConfiguration constructor instead
-     */
-    @Deprecated
-    public LocalStreamBuilder(int maxQueueCapacity) {
-        this(maxQueueCapacity, null);
-    }
-
-    /**
-     * Creates a local stream builder with a config object. If 
maxQueueCapacity is less than 1 the queue is
-     * unbounded.
-     *
-     * @param maxQueueCapacity
-     * @param streamConfig
-     *
-     * @deprecated use LocalRuntimeConfiguration constructor instead
-     */
-    @Deprecated
-    public LocalStreamBuilder(int maxQueueCapacity, Map<String, Object> 
streamConfig) {
-        this(new LocalRuntimeConfiguration());
-        this.streamConfig.setQueueSize(new Long(maxQueueCapacity));
-        if( streamConfig != null && 
streamConfig.get(LocalStreamBuilder.TIMEOUT_KEY) != null )
-            this.streamConfig.setProviderTimeoutMs(new Long((Integer) 
(streamConfig.get(LocalStreamBuilder.TIMEOUT_KEY))));
-        if( streamConfig != null && 
streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY) != null )
-            
this.streamConfig.setIdentifier((String)streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY));
-        if( streamConfig != null && 
streamConfig.get(LocalStreamBuilder.BROADCAST_KEY) != null ) {
-            MonitoringConfiguration monitoringConfiguration = new 
MonitoringConfiguration();
-            
monitoringConfiguration.setBroadcastURI((String)streamConfig.get(LocalStreamBuilder.BROADCAST_KEY));
-            if(streamConfig.get(LocalStreamBuilder.BROADCAST_INTERVAL_KEY) != 
null)
-                
monitoringConfiguration.setMonitoringBroadcastIntervalMs(Long.parseLong((String)streamConfig.get(LocalStreamBuilder.BROADCAST_INTERVAL_KEY)));
-            this.streamConfig.setMonitoring(monitoringConfiguration);
+  }
+
+  public LocalStreamBuilder(LocalRuntimeConfiguration streamConfig) {
+    this.streamConfig = streamConfig;
+    this.providers = new HashMap<String, StreamComponent>();
+    this.components = new HashMap<String, StreamComponent>();
+    this.totalTasks = 0;
+    this.monitorTasks = 0;
+    this.futures = new HashMap<>();
+  }
+
+  public void prepare() {
+    this.streamIdentifier = streamConfig.getIdentifier();
+    this.streamConfig.setStartedAt(startedAt.getMillis());
+    final LocalStreamBuilder self = this;
+    this.shutdownHook = new Thread() {
+      @Override
+      public void run() {
+        LOGGER.debug("Shutdown hook received.  Beginning shutdown");
+        self.stopInternal(true);
+      }
+    };
+    this.useDeprecatedMonitors = false;
+    this.broadcastMonitor = new 
BroadcastMonitorThread(this.streamConfig.getMonitoring());
+  }
+
+  public void setUseDeprecatedMonitors(boolean useDeprecatedMonitors) {
+    this.useDeprecatedMonitors = useDeprecatedMonitors;
+  }
+
+  @Override
+  public StreamBuilder newPerpetualStream(String id, StreamsProvider provider) 
{
+    validateId(id);
+    this.providers.put(id, new StreamComponent(id, provider, true, 
streamConfig));
+    ++this.totalTasks;
+    if(this.useDeprecatedMonitors && provider instanceof DatumStatusCountable )
+      ++this.monitorTasks;
+    return this;
+  }
+
+  @Override
+  public StreamBuilder newReadCurrentStream(String id, StreamsProvider 
provider) {
+    validateId(id);
+    this.providers.put(id, new StreamComponent(id, provider, false, 
streamConfig));
+    ++this.totalTasks;
+    if(this.useDeprecatedMonitors && provider instanceof DatumStatusCountable )
+      ++this.monitorTasks;
+    return this;
+  }
+
+  @Override
+  public StreamBuilder newReadNewStream(String id, StreamsProvider provider, 
BigInteger sequence) {
+    validateId(id);
+    this.providers.put(id, new StreamComponent(id, provider, sequence, 
streamConfig));
+    ++this.totalTasks;
+    if(this.useDeprecatedMonitors && provider instanceof DatumStatusCountable )
+      ++this.monitorTasks;
+    return this;
+  }
+
+  @Override
+  public StreamBuilder newReadRangeStream(String id, StreamsProvider provider, 
DateTime start, DateTime end) {
+    validateId(id);
+    this.providers.put(id, new StreamComponent(id, provider, start, end, 
streamConfig));
+    ++this.totalTasks;
+    if(this.useDeprecatedMonitors && provider instanceof DatumStatusCountable )
+      ++this.monitorTasks;
+    return this;
+  }
+
+  @Override
+  public StreamBuilder setStreamsConfiguration(StreamsConfiguration 
configuration) {
+    streamConfig = 
StreamsJacksonMapper.getInstance().convertValue(configuration, 
LocalRuntimeConfiguration.class);
+    return this;
+  }
+
+  @Override
+  public StreamsConfiguration getStreamsConfiguration() {
+    return StreamsJacksonMapper.getInstance().convertValue(streamConfig, 
StreamsConfiguration.class);
+  }
+
+  @Override
+  public StreamBuilder addStreamsProcessor(String id, StreamsProcessor 
processor, int numTasks, String... inBoundIds) {
+    validateId(id);
+    StreamComponent comp = new StreamComponent(id, processor, new 
ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id, streamIdentifier, 
startedAt.getMillis()), numTasks, streamConfig);
+    this.components.put(id, comp);
+    connectToOtherComponents(inBoundIds, comp);
+    this.totalTasks += numTasks;
+    if(this.useDeprecatedMonitors && processor instanceof DatumStatusCountable 
)
+      ++this.monitorTasks;
+    return this;
+  }
+
+  @Override
+  public StreamBuilder addStreamsPersistWriter(String id, StreamsPersistWriter 
writer, int numTasks, String... inBoundIds) {
+    validateId(id);
+    StreamComponent comp = new StreamComponent(id, writer, new 
ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id, streamIdentifier, 
startedAt.getMillis()), numTasks, streamConfig);
+    this.components.put(id, comp);
+    connectToOtherComponents(inBoundIds, comp);
+    this.totalTasks += numTasks;
+    if(this.useDeprecatedMonitors && writer instanceof DatumStatusCountable )
+      ++this.monitorTasks;
+    return this;
+  }
+
+  /**
+   * Runs the data stream in the this JVM and blocks till completion.
+   */
+  @Override
+  public void start() {
+    prepare();
+    attachShutdownHandler();
+    boolean isRunning = true;
+    this.executor = new 
ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this);
+    this.monitor = Executors.newCachedThreadPool();
+    Map<String, StreamsProviderTask> provTasks = new HashMap<String, 
StreamsProviderTask>();
+    tasks = new HashMap<String, List<StreamsTask>>();
+    boolean forcedShutDown = false;
+
+    try {
+      if (this.useDeprecatedMonitors) {
+        monitorThread = new LocalStreamProcessMonitorThread(executor, 10);
+        this.monitor.submit(monitorThread);
+      }
+      setupComponentTasks(tasks);
+      setupProviderTasks(provTasks);
+      LOGGER.info("Started stream with {} components", tasks.size());
+      while(isRunning) {
+        
Uninterruptibles.sleepUninterruptibly(streamConfig.getShutdownCheckDelay(), 
TimeUnit.MILLISECONDS);
+        isRunning = false;
+        for(StreamsProviderTask task : provTasks.values()) {
+          isRunning = isRunning || task.isRunning();
         }
-    }
-
-    public LocalStreamBuilder(LocalRuntimeConfiguration streamConfig) {
-        this.streamConfig = streamConfig;
-        this.providers = new HashMap<String, StreamComponent>();
-        this.components = new HashMap<String, StreamComponent>();
-        this.totalTasks = 0;
-        this.monitorTasks = 0;
-        this.futures = new HashMap<>();
-    }
-
-    public void prepare() {
-        this.streamIdentifier = streamConfig.getIdentifier();
-        this.streamConfig.setStartedAt(startedAt.getMillis());
-        final LocalStreamBuilder self = this;
-        this.shutdownHook = new Thread() {
-            @Override
-            public void run() {
-                LOGGER.debug("Shutdown hook received.  Beginning shutdown");
-                self.stopInternal(true);
+        for(StreamComponent task: components.values()) {
+          boolean tasksRunning = false;
+          for(StreamsTask t : task.getStreamsTasks()) {
+            if(t instanceof BaseStreamsTask) {
+              tasksRunning = tasksRunning || ((BaseStreamsTask) t).isRunning();
             }
-        };
-        this.useDeprecatedMonitors = false;
-        this.broadcastMonitor = new 
BroadcastMonitorThread(this.streamConfig.getMonitoring());
-    }
-
-    public void setUseDeprecatedMonitors(boolean useDeprecatedMonitors) {
-        this.useDeprecatedMonitors = useDeprecatedMonitors;
-    }
-
-    @Override
-    public StreamBuilder newPerpetualStream(String id, StreamsProvider 
provider) {
-        validateId(id);
-        this.providers.put(id, new StreamComponent(id, provider, true, 
streamConfig));
-        ++this.totalTasks;
-        if(this.useDeprecatedMonitors && provider instanceof 
DatumStatusCountable )
-            ++this.monitorTasks;
-        return this;
-    }
-
-    @Override
-    public StreamBuilder newReadCurrentStream(String id, StreamsProvider 
provider) {
-        validateId(id);
-        this.providers.put(id, new StreamComponent(id, provider, false, 
streamConfig));
-        ++this.totalTasks;
-        if(this.useDeprecatedMonitors && provider instanceof 
DatumStatusCountable )
-            ++this.monitorTasks;
-        return this;
-    }
-
-    @Override
-    public StreamBuilder newReadNewStream(String id, StreamsProvider provider, 
BigInteger sequence) {
-        validateId(id);
-        this.providers.put(id, new StreamComponent(id, provider, sequence, 
streamConfig));
-        ++this.totalTasks;
-        if(this.useDeprecatedMonitors && provider instanceof 
DatumStatusCountable )
-            ++this.monitorTasks;
-        return this;
-    }
-
-    @Override
-    public StreamBuilder newReadRangeStream(String id, StreamsProvider 
provider, DateTime start, DateTime end) {
-        validateId(id);
-        this.providers.put(id, new StreamComponent(id, provider, start, end, 
streamConfig));
-        ++this.totalTasks;
-        if(this.useDeprecatedMonitors && provider instanceof 
DatumStatusCountable )
-            ++this.monitorTasks;
-        return this;
-    }
-
-    @Override
-    public StreamBuilder setStreamsConfiguration(StreamsConfiguration 
configuration) {
-        streamConfig = 
StreamsJacksonMapper.getInstance().convertValue(configuration, 
LocalRuntimeConfiguration.class);
-        return this;
-    }
-
-    @Override
-    public StreamsConfiguration getStreamsConfiguration() {
-        return StreamsJacksonMapper.getInstance().convertValue(streamConfig, 
StreamsConfiguration.class);
+          }
+          isRunning = isRunning || (tasksRunning && 
task.getInBoundQueue().size() > 0);
+        }
+        if(isRunning) {
+          
Uninterruptibles.sleepUninterruptibly(streamConfig.getShutdownCheckInterval(), 
TimeUnit.MILLISECONDS);
+        }
+      }
+      LOGGER.info("Components are no longer running or timed out");
+    } catch (Exception e){
+      LOGGER.warn("Runtime exception.  Beginning shutdown");
+      forcedShutDown = true;
+    } finally{
+      LOGGER.info("Stream has completed, pausing @ {}", 
System.currentTimeMillis());
+      Uninterruptibles.sleepUninterruptibly(streamConfig.getShutdownPauseMs(), 
TimeUnit.MILLISECONDS);
+      LOGGER.info("Stream has completed, shutting down @ {}", 
System.currentTimeMillis());
+      stopInternal(forcedShutDown);
     }
 
-    @Override
-    public StreamBuilder addStreamsProcessor(String id, StreamsProcessor 
processor, int numTasks, String... inBoundIds) {
-        validateId(id);
-        StreamComponent comp = new StreamComponent(id, processor, new 
ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id, streamIdentifier, 
startedAt.getMillis()), numTasks, streamConfig);
-        this.components.put(id, comp);
-        connectToOtherComponents(inBoundIds, comp);
-        this.totalTasks += numTasks;
-        if(this.useDeprecatedMonitors && processor instanceof 
DatumStatusCountable )
-            ++this.monitorTasks;
-        return this;
+  }
+
+  private void attachShutdownHandler() {
+    LOGGER.debug("Attaching shutdown handler");
+    Runtime.getRuntime().addShutdownHook(shutdownHook);
+  }
+
+  private void detachShutdownHandler() {
+    LOGGER.debug("Detaching shutdown handler");
+    Runtime.getRuntime().removeShutdownHook(shutdownHook);
+  }
+
+  protected void forceShutdown(Map<String, List<StreamsTask>> streamsTasks) {
+    LOGGER.debug("Shutdown failed.  Forcing shutdown");
+    for(List<StreamsTask> tasks : streamsTasks.values()) {
+      for(StreamsTask task : tasks) {
+        task.stopTask();
+        if(task.isWaiting()) {
+          this.futures.get(task).cancel(true);
+        }
+      }
     }
-
-    @Override
-    public StreamBuilder addStreamsPersistWriter(String id, 
StreamsPersistWriter writer, int numTasks, String... inBoundIds) {
-        validateId(id);
-        StreamComponent comp = new StreamComponent(id, writer, new 
ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id, streamIdentifier, 
startedAt.getMillis()), numTasks, streamConfig);
-        this.components.put(id, comp);
-        connectToOtherComponents(inBoundIds, comp);
-        this.totalTasks += numTasks;
-        if(this.useDeprecatedMonitors && writer instanceof 
DatumStatusCountable )
-            ++this.monitorTasks;
-        return this;
+    this.executor.shutdown();
+    this.monitor.shutdown();
+    try {
+      
if(!this.executor.awaitTermination(streamConfig.getExecutorShutdownPauseMs(), 
TimeUnit.MILLISECONDS)){
+        this.executor.shutdownNow();
+      }
+      
if(!this.monitor.awaitTermination(streamConfig.getMonitorShutdownPauseMs(), 
TimeUnit.MILLISECONDS)){
+        this.monitor.shutdownNow();
+      }
+    }catch (InterruptedException ie) {
+      this.executor.shutdownNow();
+      this.monitor.shutdownNow();
+      throw new RuntimeException(ie);
     }
+  }
 
-    /**
-     * Runs the data stream in the this JVM and blocks till completion.
-     */
-    @Override
-    public void start() {
-        prepare();
-        attachShutdownHandler();
-        boolean isRunning = true;
-        this.executor = new 
ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this);
-        this.monitor = Executors.newCachedThreadPool();
-        Map<String, StreamsProviderTask> provTasks = new HashMap<String, 
StreamsProviderTask>();
-        tasks = new HashMap<String, List<StreamsTask>>();
-        boolean forcedShutDown = false;
-
-        try {
-            if (this.useDeprecatedMonitors) {
-                monitorThread = new LocalStreamProcessMonitorThread(executor, 
10);
-                this.monitor.submit(monitorThread);
-            }
-            setupComponentTasks(tasks);
-            setupProviderTasks(provTasks);
-            LOGGER.info("Started stream with {} components", tasks.size());
-            while(isRunning) {
-                
Uninterruptibles.sleepUninterruptibly(streamConfig.getShutdownCheckDelay(), 
TimeUnit.MILLISECONDS);
-                isRunning = false;
-                for(StreamsProviderTask task : provTasks.values()) {
-                    isRunning = isRunning || task.isRunning();
-                }
-                for(StreamComponent task: components.values()) {
-                    boolean tasksRunning = false;
-                    for(StreamsTask t : task.getStreamsTasks()) {
-                        if(t instanceof BaseStreamsTask) {
-                            tasksRunning = tasksRunning || ((BaseStreamsTask) 
t).isRunning();
-                        }
-                    }
-                    isRunning = isRunning || (tasksRunning && 
task.getInBoundQueue().size() > 0);
-                }
-                if(isRunning) {
-                    
Uninterruptibles.sleepUninterruptibly(streamConfig.getShutdownCheckInterval(), 
TimeUnit.MILLISECONDS);
-                }
-            }
-            LOGGER.info("Components are no longer running or timed out");
-        } catch (Exception e){
-            LOGGER.warn("Runtime exception.  Beginning shutdown");
-            forcedShutDown = true;
-        } finally{
-            LOGGER.info("Stream has completed, pausing @ {}", 
System.currentTimeMillis());
-            
Uninterruptibles.sleepUninterruptibly(streamConfig.getShutdownPauseMs(), 
TimeUnit.MILLISECONDS);
-            LOGGER.info("Stream has completed, shutting down @ {}", 
System.currentTimeMillis());
-            stopInternal(forcedShutDown);
-        }
-
+  protected void shutdown(Map<String, List<StreamsTask>> streamsTasks) throws 
InterruptedException {
+    LOGGER.info("Attempting to shutdown tasks");
+    if (this.monitorThread != null) {
+      this.monitorThread.shutdown();
     }
-
-    private void attachShutdownHandler() {
-        LOGGER.debug("Attaching shutdown handler");
-        Runtime.getRuntime().addShutdownHook(shutdownHook);
+    this.executor.shutdown();
+    //complete stream shut down gracfully
+    for(StreamComponent prov : this.providers.values()) {
+      shutDownTask(prov, streamsTasks);
     }
-
-    private void detachShutdownHandler() {
-        LOGGER.debug("Detaching shutdown handler");
-        Runtime.getRuntime().removeShutdownHook(shutdownHook);
+    //need to make this configurable
+    
if(!this.executor.awaitTermination(streamConfig.getExecutorShutdownWaitMs(), 
TimeUnit.MILLISECONDS)) { // all threads should have terminated already.
+      this.executor.shutdownNow();
+      this.executor.awaitTermination(streamConfig.getExecutorShutdownWaitMs(), 
TimeUnit.MILLISECONDS);
     }
-
-    protected void forceShutdown(Map<String, List<StreamsTask>> streamsTasks) {
-        LOGGER.debug("Shutdown failed.  Forcing shutdown");
-        for(List<StreamsTask> tasks : streamsTasks.values()) {
-            for(StreamsTask task : tasks) {
-                task.stopTask();
-                if(task.isWaiting()) {
-                    this.futures.get(task).cancel(true);
-                }
-            }
-        }
-        this.executor.shutdown();
-        this.monitor.shutdown();
-        try {
-            
if(!this.executor.awaitTermination(streamConfig.getExecutorShutdownPauseMs(), 
TimeUnit.MILLISECONDS)){
-                this.executor.shutdownNow();
-            }
-            
if(!this.monitor.awaitTermination(streamConfig.getMonitorShutdownPauseMs(), 
TimeUnit.MILLISECONDS)){
-                this.monitor.shutdownNow();
-            }
-        }catch (InterruptedException ie) {
-            this.executor.shutdownNow();
-            this.monitor.shutdownNow();
-            throw new RuntimeException(ie);
-        }
+    if(!this.monitor.awaitTermination(streamConfig.getMonitorShutdownWaitMs(), 
TimeUnit.MILLISECONDS)) { // all threads should have terminated already.
+      this.monitor.shutdownNow();
+      this.monitor.awaitTermination(streamConfig.getMonitorShutdownWaitMs(), 
TimeUnit.MILLISECONDS);
     }
-
-    protected void shutdown(Map<String, List<StreamsTask>> streamsTasks) 
throws InterruptedException {
-        LOGGER.info("Attempting to shutdown tasks");
-        if (this.monitorThread != null) {
-            this.monitorThread.shutdown();
-        }
-        this.executor.shutdown();
-        //complete stream shut down gracfully
-        for(StreamComponent prov : this.providers.values()) {
-            shutDownTask(prov, streamsTasks);
-        }
-        //need to make this configurable
-        
if(!this.executor.awaitTermination(streamConfig.getExecutorShutdownWaitMs(), 
TimeUnit.MILLISECONDS)) { // all threads should have terminated already.
-            this.executor.shutdownNow();
-            
this.executor.awaitTermination(streamConfig.getExecutorShutdownWaitMs(), 
TimeUnit.MILLISECONDS);
-        }
-        
if(!this.monitor.awaitTermination(streamConfig.getMonitorShutdownWaitMs(), 
TimeUnit.MILLISECONDS)) { // all threads should have terminated already.
-            this.monitor.shutdownNow();
-            
this.monitor.awaitTermination(streamConfig.getMonitorShutdownWaitMs(), 
TimeUnit.MILLISECONDS);
-        }
+  }
+
+  protected void setupProviderTasks(Map<String, StreamsProviderTask> 
provTasks) {
+    for(StreamComponent prov : this.providers.values()) {
+      StreamsTask task = prov.createConnectedTask(getTimeout());
+      task.setStreamConfig(this.streamConfig);
+      StreamsTaskCounter counter = new StreamsTaskCounter(prov.getId(), 
streamIdentifier, startedAt.getMillis());
+      task.setStreamsTaskCounter(counter);
+      this.executor.submit(task);
+      provTasks.put(prov.getId(), (StreamsProviderTask) task);
+      if(this.useDeprecatedMonitors && prov.isOperationCountable() ) {
+        this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) prov.getOperation(), 10));
+        this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) task, 10));
+      }
     }
-
-    protected void setupProviderTasks(Map<String, StreamsProviderTask> 
provTasks) {
-        for(StreamComponent prov : this.providers.values()) {
-            StreamsTask task = prov.createConnectedTask(getTimeout());
-            task.setStreamConfig(this.streamConfig);
-            StreamsTaskCounter counter = new StreamsTaskCounter(prov.getId(), 
streamIdentifier, startedAt.getMillis());
-            task.setStreamsTaskCounter(counter);
-            this.executor.submit(task);
-            provTasks.put(prov.getId(), (StreamsProviderTask) task);
-            if(this.useDeprecatedMonitors && prov.isOperationCountable() ) {
-                this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) prov.getOperation(), 10));
-                this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) task, 10));
-            }
+  }
+
+  protected void setupComponentTasks(Map<String, List<StreamsTask>> 
streamsTasks) {
+    for(StreamComponent comp : this.components.values()) {
+      int tasks = comp.getNumTasks();
+      List<StreamsTask> compTasks = new LinkedList<StreamsTask>();
+      StreamsTaskCounter counter = new StreamsTaskCounter(comp.getId(), 
streamIdentifier, startedAt.getMillis());
+      for(int i=0; i < tasks; ++i) {
+        StreamsTask task = comp.createConnectedTask(getTimeout());
+        task.setStreamsTaskCounter(counter);
+        task.setStreamConfig(this.streamConfig);
+        this.futures.put(task, this.executor.submit(task));
+        compTasks.add(task);
+        if(this.useDeprecatedMonitors &&  comp.isOperationCountable() ) {
+          this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10));
+          this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) task, 10));
         }
+        this.monitor.submit(broadcastMonitor);
+      }
+      streamsTasks.put(comp.getId(), compTasks);
     }
-
-    protected void setupComponentTasks(Map<String, List<StreamsTask>> 
streamsTasks) {
-        for(StreamComponent comp : this.components.values()) {
-            int tasks = comp.getNumTasks();
-            List<StreamsTask> compTasks = new LinkedList<StreamsTask>();
-            StreamsTaskCounter counter = new StreamsTaskCounter(comp.getId(), 
streamIdentifier, startedAt.getMillis());
-            for(int i=0; i < tasks; ++i) {
-                StreamsTask task = comp.createConnectedTask(getTimeout());
-                task.setStreamsTaskCounter(counter);
-                task.setStreamConfig(this.streamConfig);
-                this.futures.put(task, this.executor.submit(task));
-                compTasks.add(task);
-                if(this.useDeprecatedMonitors &&  comp.isOperationCountable() 
) {
-                    this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10));
-                    this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) task, 10));
-                }
-                this.monitor.submit(broadcastMonitor);
-            }
-            streamsTasks.put(comp.getId(), compTasks);
+  }
+
+  /**
+   * Shutsdown the running tasks in sudo depth first search kind of way. 
Checks that the upstream components have
+   * finished running before shutting down. Waits till inbound queue is empty 
to shutdown.
+   * @param comp StreamComponent to shut down.
+   * @param streamTasks the list of non-StreamsProvider tasks for this stream.
+   * @throws InterruptedException
+   */
+  private void shutDownTask(StreamComponent comp, Map<String, 
List<StreamsTask>> streamTasks) throws InterruptedException {
+    List<StreamsTask> tasks = streamTasks.get(comp.getId());
+    if(tasks != null) { //not a StreamProvider
+      boolean parentsShutDown = true;
+      for(StreamComponent parent : comp.getUpStreamComponents()) {
+        List<StreamsTask> parentTasks = streamTasks.get(parent.getId());
+        //if parentTask == null, its a provider and is not running anymore
+        if(parentTasks != null) {
+          for(StreamsTask task : parentTasks) {
+            parentsShutDown = parentsShutDown && !task.isRunning();
+          }
         }
-    }
-
-    /**
-     * Shutsdown the running tasks in sudo depth first search kind of way. 
Checks that the upstream components have
-     * finished running before shutting down. Waits till inbound queue is 
empty to shutdown.
-     * @param comp StreamComponent to shut down.
-     * @param streamTasks the list of non-StreamsProvider tasks for this 
stream.
-     * @throws InterruptedException
-     */
-    private void shutDownTask(StreamComponent comp, Map<String, 
List<StreamsTask>> streamTasks) throws InterruptedException {
-        List<StreamsTask> tasks = streamTasks.get(comp.getId());
-        if(tasks != null) { //not a StreamProvider
-            boolean parentsShutDown = true;
-            for(StreamComponent parent : comp.getUpStreamComponents()) {
-                List<StreamsTask> parentTasks = 
streamTasks.get(parent.getId());
-                //if parentTask == null, its a provider and is not running 
anymore
-                if(parentTasks != null) {
-                    for(StreamsTask task : parentTasks) {
-                        parentsShutDown = parentsShutDown && !task.isRunning();
-                    }
-                }
-            }
-            if(parentsShutDown) {
-                for(StreamsTask task : tasks) {
-                    task.stopTask();
-                    if(task.isWaiting()) {
-                        this.futures.get(task).cancel(true); // no data to 
process, interrupt block queue
-                    }
-                }
-                for(StreamsTask task : tasks) {
-                    int count = 0;
-                    while(count < streamConfig.getTaskTimeoutMs() / 1000 && 
task.isRunning()) {
-                        Uninterruptibles.sleepUninterruptibly(1, 
TimeUnit.SECONDS);
-                        count++;
-                    }
-
-                    if(task.isRunning()) {
-                        LOGGER.warn("Task {} failed to terminate in allotted 
timeframe", task.toString());
-                    }
-                }
-            }
+      }
+      if(parentsShutDown) {
+        for(StreamsTask task : tasks) {
+          task.stopTask();
+          if(task.isWaiting()) {
+            this.futures.get(task).cancel(true); // no data to process, 
interrupt block queue
+          }
         }
-        Collection<StreamComponent> children = comp.getDownStreamComponents();
-        if(children != null) {
-            for(StreamComponent child : comp.getDownStreamComponents()) {
-                shutDownTask(child, streamTasks);
-            }
+        for(StreamsTask task : tasks) {
+          int count = 0;
+          while(count < streamConfig.getTaskTimeoutMs() / 1000 && 
task.isRunning()) {
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+            count++;
+          }
+
+          if(task.isRunning()) {
+            LOGGER.warn("Task {} failed to terminate in allotted timeframe", 
task.toString());
+          }
         }
+      }
     }
-
-    /**
-     * NOT IMPLEMENTED.
-     */
-    @Override
-    public void stop() {
-        stopInternal(false);
-    }
-
-
-
-    protected void stopInternal(boolean systemExiting) {
-        try {
-            shutdown(tasks);
-        } catch (Exception e) {
-            LOGGER.error("Exception while trying to shutdown Stream: {}", e);
-            forceShutdown(tasks);
-        } finally {
-            try {
-                 if(!systemExiting) {
-                      detachShutdownHandler();
-                  }
-               } catch( Throwable e3 ) {
-                 LOGGER.error("StopInternal caught Throwable: {}", e3);
-                       System.exit(1);
-            }
-        }
+    Collection<StreamComponent> children = comp.getDownStreamComponents();
+    if(children != null) {
+      for(StreamComponent child : comp.getDownStreamComponents()) {
+        shutDownTask(child, streamTasks);
+      }
     }
-
-    private void connectToOtherComponents(String[] conntectToIds, 
StreamComponent toBeConnected) {
-        for(String id : conntectToIds) {
-            StreamComponent upStream = null;
-            if(this.providers.containsKey(id)) {
-                upStream = this.providers.get(id);
-            }
-            else if(this.components.containsKey(id)) {
-                upStream = this.components.get(id);
-            }
-            else {
-                throw new InvalidStreamException("Cannot connect to id, 
"+id+", because id does not exist.");
-            }
-            upStream.addOutBoundQueue(toBeConnected, 
toBeConnected.getInBoundQueue());
-            toBeConnected.addInboundQueue(upStream);
+  }
+
+  /**
+   * NOT IMPLEMENTED.
+   */
+  @Override
+  public void stop() {
+    stopInternal(false);
+  }
+
+
+
+  protected void stopInternal(boolean systemExiting) {
+    try {
+      shutdown(tasks);
+    } catch (Exception e) {
+      LOGGER.error("Exception while trying to shutdown Stream: {}", e);
+      forceShutdown(tasks);
+    } finally {
+      try {
+        if(!systemExiting) {
+          detachShutdownHandler();
         }
+      } catch( Throwable e3 ) {
+        LOGGER.error("StopInternal caught Throwable: {}", e3);
+        System.exit(1);
+      }
     }
-
-    private void validateId(String id) {
-        if(this.providers.containsKey(id) || this.components.containsKey(id)) {
-            throw new InvalidStreamException("Duplicate id. "+id+" is already 
assigned to another component");
-        } else if(id.contains(":")) {
-            throw new InvalidStreamException("Invalid character, ':', in 
component id : "+id);
-        }
+  }
+
+  private void connectToOtherComponents(String[] conntectToIds, 
StreamComponent toBeConnected) {
+    for(String id : conntectToIds) {
+      StreamComponent upStream = null;
+      if(this.providers.containsKey(id)) {
+        upStream = this.providers.get(id);
+      }
+      else if(this.components.containsKey(id)) {
+        upStream = this.components.get(id);
+      }
+      else {
+        throw new InvalidStreamException("Cannot connect to id, "+id+", 
because id does not exist.");
+      }
+      upStream.addOutBoundQueue(toBeConnected, 
toBeConnected.getInBoundQueue());
+      toBeConnected.addInboundQueue(upStream);
     }
+  }
 
-    protected int getTimeout() {
-        //Set the timeout of it is configured, otherwise signal downstream 
components to use their default
-        return streamConfig.getProviderTimeoutMs().intValue();
+  private void validateId(String id) {
+    if(this.providers.containsKey(id) || this.components.containsKey(id)) {
+      throw new InvalidStreamException("Duplicate id. "+id+" is already 
assigned to another component");
+    } else if(id.contains(":")) {
+      throw new InvalidStreamException("Invalid character, ':', in component 
id : "+id);
     }
-
-    private LocalRuntimeConfiguration convertConfiguration(Map<String, Object> 
streamConfig) {
-        LocalRuntimeConfiguration config = new LocalRuntimeConfiguration();
-        if( streamConfig != null ) {
-            for( Map.Entry<String, Object> item : streamConfig.entrySet() ) {
-                config.setAdditionalProperty(item.getKey(), item.getValue());
-            }
-        }
-        return config;
+  }
+
+  protected int getTimeout() {
+    //Set the timeout of it is configured, otherwise signal downstream 
components to use their default
+    return streamConfig.getProviderTimeoutMs().intValue();
+  }
+
+  private LocalRuntimeConfiguration convertConfiguration(Map<String, Object> 
streamConfig) {
+    LocalRuntimeConfiguration config = new LocalRuntimeConfiguration();
+    if( streamConfig != null ) {
+      for( Map.Entry<String, Object> item : streamConfig.entrySet() ) {
+        config.setAdditionalProperty(item.getKey(), item.getValue());
+      }
     }
+    return config;
+  }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
index 9d602cd..75799b7 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
@@ -18,19 +18,30 @@
 
 package org.apache.streams.local.builders;
 
-import com.google.common.collect.Lists;
 import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.core.*;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsOperation;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.local.tasks.StreamsPersistWriterTask;
 import org.apache.streams.local.tasks.StreamsProcessorTask;
 import org.apache.streams.local.tasks.StreamsProviderTask;
 import org.apache.streams.local.tasks.StreamsTask;
 import org.apache.streams.util.SerializationUtil;
+
+import com.google.common.collect.Lists;
 import org.joda.time.DateTime;
 
 import java.io.Serializable;
 import java.math.BigInteger;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 
 /**
@@ -39,263 +50,263 @@ import java.util.concurrent.BlockingQueue;
  */
 public class StreamComponent implements Serializable {
 
-    private static final int START = 1;
-    private static final int END = 2;
+  private static final int START = 1;
+  private static final int END = 2;
 
-    private String id;
-    private Set<StreamComponent> inBound;
-    private Map<StreamComponent, BlockingQueue<StreamsDatum>> outBound;
-    private BlockingQueue<StreamsDatum> inQueue;
-    private StreamsProvider provider;
-    private StreamsProcessor processor;
-    private StreamsPersistWriter writer;
-    private DateTime[] dateRange;
-    private BigInteger sequence;
-    private int numTasks = 1;
-    private boolean perpetual;
+  private String id;
+  private Set<StreamComponent> inBound;
+  private Map<StreamComponent, BlockingQueue<StreamsDatum>> outBound;
+  private BlockingQueue<StreamsDatum> inQueue;
+  private StreamsProvider provider;
+  private StreamsProcessor processor;
+  private StreamsPersistWriter writer;
+  private DateTime[] dateRange;
+  private BigInteger sequence;
+  private int numTasks = 1;
+  private boolean perpetual;
 
-    private List<StreamsTask> tasks;
+  private List<StreamsTask> tasks;
 
-    private StreamsConfiguration streamConfig;
+  private StreamsConfiguration streamConfig;
 
-    /**
-     *
-     * @param id
-     * @param provider
-     */
-    public StreamComponent(String id, StreamsProvider provider, boolean 
perpetual, StreamsConfiguration streamConfig) {
-        this.id = id;
-        this.provider = provider;
-        this.perpetual = perpetual;
-        this.streamConfig = streamConfig;
-        initializePrivateVariables();
-    }
+  /**
+   *
+   * @param id
+   * @param provider
+   */
+  public StreamComponent(String id, StreamsProvider provider, boolean 
perpetual, StreamsConfiguration streamConfig) {
+    this.id = id;
+    this.provider = provider;
+    this.perpetual = perpetual;
+    this.streamConfig = streamConfig;
+    initializePrivateVariables();
+  }
 
-    /**
-     *
-     * @param id
-     * @param provider
-     * @param start
-     * @param end
-     */
-    public StreamComponent(String id, StreamsProvider provider, DateTime 
start, DateTime end, StreamsConfiguration streamConfig) {
-        this.id = id;
-        this.provider = provider;
-        this.dateRange = new DateTime[2];
-        this.dateRange[START] = start;
-        this.dateRange[END] = end;
-        this.streamConfig = streamConfig;
-        initializePrivateVariables();
-    }
+  /**
+   *
+   * @param id
+   * @param provider
+   * @param start
+   * @param end
+   */
+  public StreamComponent(String id, StreamsProvider provider, DateTime start, 
DateTime end, StreamsConfiguration streamConfig) {
+    this.id = id;
+    this.provider = provider;
+    this.dateRange = new DateTime[2];
+    this.dateRange[START] = start;
+    this.dateRange[END] = end;
+    this.streamConfig = streamConfig;
+    initializePrivateVariables();
+  }
 
 
-    /**
-     *
-     * @param id
-     * @param provider
-     * @param sequence
-     */
-    public StreamComponent(String id, StreamsProvider provider, BigInteger 
sequence, StreamsConfiguration streamConfig) {
-        this.id = id;
-        this.provider = provider;
-        this.sequence = sequence;
-        this.streamConfig = streamConfig;
-    }
+  /**
+   *
+   * @param id
+   * @param provider
+   * @param sequence
+   */
+  public StreamComponent(String id, StreamsProvider provider, BigInteger 
sequence, StreamsConfiguration streamConfig) {
+    this.id = id;
+    this.provider = provider;
+    this.sequence = sequence;
+    this.streamConfig = streamConfig;
+  }
 
-    /**
-     *
-     * @param id
-     * @param processor
-     * @param inQueue
-     * @param numTasks
-     */
-    public StreamComponent(String id, StreamsProcessor processor, 
BlockingQueue<StreamsDatum> inQueue, int numTasks, StreamsConfiguration 
streamConfig) {
-        this.id = id;
-        this.processor = processor;
-        this.inQueue = inQueue;
-        this.numTasks = numTasks;
-        this.streamConfig = streamConfig;
-        initializePrivateVariables();
-    }
+  /**
+   *
+   * @param id
+   * @param processor
+   * @param inQueue
+   * @param numTasks
+   */
+  public StreamComponent(String id, StreamsProcessor processor, 
BlockingQueue<StreamsDatum> inQueue, int numTasks, StreamsConfiguration 
streamConfig) {
+    this.id = id;
+    this.processor = processor;
+    this.inQueue = inQueue;
+    this.numTasks = numTasks;
+    this.streamConfig = streamConfig;
+    initializePrivateVariables();
+  }
 
-    /**
-     *
-     * @param id
-     * @param writer
-     * @param inQueue
-     * @param numTasks
-     */
-    public StreamComponent(String id, StreamsPersistWriter writer, 
BlockingQueue<StreamsDatum> inQueue, int numTasks, StreamsConfiguration 
streamConfig) {
-        this.id = id;
-        this.writer = writer;
-        this.inQueue = inQueue;
-        this.numTasks = numTasks;
-        this.streamConfig = streamConfig;
-        initializePrivateVariables();
-    }
+  /**
+   *
+   * @param id
+   * @param writer
+   * @param inQueue
+   * @param numTasks
+   */
+  public StreamComponent(String id, StreamsPersistWriter writer, 
BlockingQueue<StreamsDatum> inQueue, int numTasks, StreamsConfiguration 
streamConfig) {
+    this.id = id;
+    this.writer = writer;
+    this.inQueue = inQueue;
+    this.numTasks = numTasks;
+    this.streamConfig = streamConfig;
+    initializePrivateVariables();
+  }
 
-    private void initializePrivateVariables() {
-        this.inBound = new HashSet<StreamComponent>();
-        this.outBound = new HashMap<StreamComponent, 
BlockingQueue<StreamsDatum>>();
-        this.tasks = Lists.newArrayList();
-    }
+  private void initializePrivateVariables() {
+    this.inBound = new HashSet<StreamComponent>();
+    this.outBound = new HashMap<StreamComponent, 
BlockingQueue<StreamsDatum>>();
+    this.tasks = Lists.newArrayList();
+  }
 
-    /**
-     * Add an outbound queue for this component. The queue should be an 
inbound queue of a downstream component.
-     * @param component the component that this supplying their inbound queue
-     * @param queue the queue to to put post processed/provided datums on
-     */
-    public void addOutBoundQueue(StreamComponent component, 
BlockingQueue<StreamsDatum> queue) {
-        this.outBound.put(component, queue);
-    }
+  /**
+   * Add an outbound queue for this component. The queue should be an inbound 
queue of a downstream component.
+   * @param component the component that this supplying their inbound queue
+   * @param queue the queue to to put post processed/provided datums on
+   */
+  public void addOutBoundQueue(StreamComponent component, 
BlockingQueue<StreamsDatum> queue) {
+    this.outBound.put(component, queue);
+  }
 
-    /**
-     * Add a component that supplies data through the inbound queue.
-     * @param component that supplies data through the inbound queue
-     */
-    public void addInboundQueue(StreamComponent component) {
-        this.inBound.add(component);
-    }
+  /**
+   * Add a component that supplies data through the inbound queue.
+   * @param component that supplies data through the inbound queue
+   */
+  public void addInboundQueue(StreamComponent component) {
+    this.inBound.add(component);
+  }
 
-    /**
-     * The components that are immediately downstream of this component (aka 
child nodes)
-     * @return Collection of child nodes of this component
-     */
-    public Collection<StreamComponent> getDownStreamComponents() {
-        return this.outBound.keySet();
-    }
+  /**
+   * The components that are immediately downstream of this component (aka 
child nodes)
+   * @return Collection of child nodes of this component
+   */
+  public Collection<StreamComponent> getDownStreamComponents() {
+    return this.outBound.keySet();
+  }
 
-    /**
-     * The components that are immediately upstream of this component (aka 
parent nodes)
-     * @return Collection of parent nodes of this component
-     */
-    public Collection<StreamComponent> getUpStreamComponents() {
-        return this.inBound;
-    }
+  /**
+   * The components that are immediately upstream of this component (aka 
parent nodes)
+   * @return Collection of parent nodes of this component
+   */
+  public Collection<StreamComponent> getUpStreamComponents() {
+    return this.inBound;
+  }
 
-    /**
-     * The inbound queue for this component
-     * @return inbound queue
-     */
-    public BlockingQueue<StreamsDatum> getInBoundQueue() {
-        return this.inQueue;
-    }
+  /**
+   * The inbound queue for this component
+   * @return inbound queue
+   */
+  public BlockingQueue<StreamsDatum> getInBoundQueue() {
+    return this.inQueue;
+  }
 
-    /**
-     * The number of tasks this to run this component
-     * @return
-     */
-    public int getNumTasks() {
-        return this.numTasks;
-    }
+  /**
+   * The number of tasks this to run this component
+   * @return
+   */
+  public int getNumTasks() {
+    return this.numTasks;
+  }
 
-    /**
-     * Creates a {@link org.apache.streams.local.tasks.StreamsTask} that is 
running a clone of this component whose
-     * inbound and outbound queues are appropriately connected to the parent 
and child nodes.
-     *
-     * @return StreamsTask for this component
-     * @param timeout The timeout to use in milliseconds for any tasks that 
support configurable timeout
-     */
-    public StreamsTask createConnectedTask(int timeout) {
-        StreamsTask task;
-        if(this.processor != null) {
-            if(this.numTasks > 1) {
-                task =  new 
StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor),
 streamConfig);
-                task.addInputQueue(this.inQueue);
-                for(BlockingQueue<StreamsDatum> q : this.outBound.values()) {
-                    task.addOutputQueue(q);
-                }
-            } else {
-                task = new StreamsProcessorTask(this.processor, streamConfig);
-                task.addInputQueue(this.inQueue);
-                for(BlockingQueue<StreamsDatum> q : this.outBound.values()) {
-                    task.addOutputQueue(q);
-                }
-            }
-        }
-        else if(this.writer != null) {
-            if(this.numTasks > 1) {
-                task = new StreamsPersistWriterTask((StreamsPersistWriter) 
SerializationUtil.cloneBySerialization(this.writer), streamConfig);
-                task.addInputQueue(this.inQueue);
-            } else {
-                task = new StreamsPersistWriterTask(this.writer, streamConfig);
-                task.addInputQueue(this.inQueue);
-            }
-        }
-        else if(this.provider != null) {
-            StreamsProvider prov;
-            if(this.numTasks > 1) {
-                prov = 
(StreamsProvider)SerializationUtil.cloneBySerialization(this.provider);
-            } else {
-                prov = this.provider;
-            }
-            if(this.dateRange == null && this.sequence == null)
-                task = new StreamsProviderTask(prov, this.perpetual, 
streamConfig);
-            else if(this.sequence != null)
-                task = new StreamsProviderTask(prov, this.sequence, 
streamConfig);
-            else
-                task = new StreamsProviderTask(prov, this.dateRange[0], 
this.dateRange[1], streamConfig);
-            //Adjust the timeout if necessary
-            if(timeout != 0) {
-                ((StreamsProviderTask)task).setTimeout(timeout);
-            }
-            for(BlockingQueue<StreamsDatum> q : this.outBound.values()) {
-                task.addOutputQueue(q);
-            }
-        }
-        else {
-            throw new InvalidStreamException("Underlying StreamComponoent was 
NULL.");
+  /**
+   * Creates a {@link org.apache.streams.local.tasks.StreamsTask} that is 
running a clone of this component whose
+   * inbound and outbound queues are appropriately connected to the parent and 
child nodes.
+   *
+   * @return StreamsTask for this component
+   * @param timeout The timeout to use in milliseconds for any tasks that 
support configurable timeout
+   */
+  public StreamsTask createConnectedTask(int timeout) {
+    StreamsTask task;
+    if(this.processor != null) {
+      if(this.numTasks > 1) {
+        task =  new 
StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor),
 streamConfig);
+        task.addInputQueue(this.inQueue);
+        for(BlockingQueue<StreamsDatum> q : this.outBound.values()) {
+          task.addOutputQueue(q);
         }
-
-        if(task != null) {
-            tasks.add(task);
+      } else {
+        task = new StreamsProcessorTask(this.processor, streamConfig);
+        task.addInputQueue(this.inQueue);
+        for(BlockingQueue<StreamsDatum> q : this.outBound.values()) {
+          task.addOutputQueue(q);
         }
-
-        return task;
+      }
     }
-
-    public List<StreamsTask> getStreamsTasks() {
-        return this.tasks;
+    else if(this.writer != null) {
+      if(this.numTasks > 1) {
+        task = new StreamsPersistWriterTask((StreamsPersistWriter) 
SerializationUtil.cloneBySerialization(this.writer), streamConfig);
+        task.addInputQueue(this.inQueue);
+      } else {
+        task = new StreamsPersistWriterTask(this.writer, streamConfig);
+        task.addInputQueue(this.inQueue);
+      }
     }
-
-    /**
-     * The unique of this component
-     * @return
-     */
-    public String getId() {
-        return this.id;
+    else if(this.provider != null) {
+      StreamsProvider prov;
+      if(this.numTasks > 1) {
+        prov = 
(StreamsProvider)SerializationUtil.cloneBySerialization(this.provider);
+      } else {
+        prov = this.provider;
+      }
+      if(this.dateRange == null && this.sequence == null)
+        task = new StreamsProviderTask(prov, this.perpetual, streamConfig);
+      else if(this.sequence != null)
+        task = new StreamsProviderTask(prov, this.sequence, streamConfig);
+      else
+        task = new StreamsProviderTask(prov, this.dateRange[0], 
this.dateRange[1], streamConfig);
+      //Adjust the timeout if necessary
+      if(timeout != 0) {
+        ((StreamsProviderTask)task).setTimeout(timeout);
+      }
+      for(BlockingQueue<StreamsDatum> q : this.outBound.values()) {
+        task.addOutputQueue(q);
+      }
     }
-
-    @Override
-    public int hashCode() {
-        return this.id.hashCode();
+    else {
+      throw new InvalidStreamException("Underlying StreamComponoent was 
NULL.");
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if(o instanceof StreamComponent)
-            return this.id.equals(((StreamComponent) o).id);
-        else
-            return false;
+    if(task != null) {
+      tasks.add(task);
     }
 
-    protected StreamsOperation getOperation() {
-        if(this.processor != null) {
-            return (StreamsOperation) this.processor;
-        }
-        else if(this.writer != null) {
-            return (StreamsOperation) this.writer;
-        }
-        else if(this.provider != null) {
-            return (StreamsOperation) this.provider;
-        }
-        else {
-            throw new InvalidStreamException("Underlying StreamComponoent was 
NULL.");
-        }
-    }
+    return task;
+  }
+
+  public List<StreamsTask> getStreamsTasks() {
+    return this.tasks;
+  }
+
+  /**
+   * The unique of this component
+   * @return
+   */
+  public String getId() {
+    return this.id;
+  }
+
+  @Override
+  public int hashCode() {
+    return this.id.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if(o instanceof StreamComponent)
+      return this.id.equals(((StreamComponent) o).id);
+    else
+      return false;
+  }
 
-    @Deprecated
-    protected boolean isOperationCountable() {
-        return getOperation() instanceof DatumStatusCountable;
+  protected StreamsOperation getOperation() {
+    if(this.processor != null) {
+      return (StreamsOperation) this.processor;
     }
+    else if(this.writer != null) {
+      return (StreamsOperation) this.writer;
+    }
+    else if(this.provider != null) {
+      return (StreamsOperation) this.provider;
+    }
+    else {
+      throw new InvalidStreamException("Underlying StreamComponoent was 
NULL.");
+    }
+  }
+
+  @Deprecated
+  protected boolean isOperationCountable() {
+    return getOperation() instanceof DatumStatusCountable;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
index 34d2bcc..e2884d0 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
@@ -17,14 +17,13 @@
  */
 package org.apache.streams.local.counters;
 
-import net.jcip.annotations.ThreadSafe;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 import org.apache.streams.util.ComponentUtils;
+
+import net.jcip.annotations.ThreadSafe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.management.*;
-import java.lang.management.ManagementFactory;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -33,56 +32,56 @@ import java.util.concurrent.atomic.AtomicLong;
 @ThreadSafe
 public class DatumStatusCounter implements DatumStatusCounterMXBean{
 
-    public static final String NAME_TEMPLATE = 
"org.apache.streams.local:type=DatumCounter,name=%s,identifier=%s,startedAt=%s";
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(DatumStatusCounter.class);
+  public static final String NAME_TEMPLATE = 
"org.apache.streams.local:type=DatumCounter,name=%s,identifier=%s,startedAt=%s";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DatumStatusCounter.class);
 
-    private AtomicLong failed;
-    private AtomicLong passed;
+  private AtomicLong failed;
+  private AtomicLong passed;
 
-    public DatumStatusCounter(String id) {
-        this(id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
-    }
+  public DatumStatusCounter(String id) {
+    this(id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
+  }
 
-    public DatumStatusCounter(String id, String streamIdentifier, long 
startedAt) {
-        this.failed = new AtomicLong(0);
-        this.passed = new AtomicLong(0);
-        ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id, 
streamIdentifier, startedAt), this);
-    }
+  public DatumStatusCounter(String id, String streamIdentifier, long 
startedAt) {
+    this.failed = new AtomicLong(0);
+    this.passed = new AtomicLong(0);
+    ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id, 
streamIdentifier, startedAt), this);
+  }
 
-    public void incrementFailedCount() {
-        this.incrementFailedCount(1);
-    }
+  public void incrementFailedCount() {
+    this.incrementFailedCount(1);
+  }
 
-    public void incrementFailedCount(long delta) {
-        this.failed.addAndGet(delta);
-    }
+  public void incrementFailedCount(long delta) {
+    this.failed.addAndGet(delta);
+  }
 
-    public void incrementPassedCount() {
-        this.incrementPassedCount(1);
-    }
+  public void incrementPassedCount() {
+    this.incrementPassedCount(1);
+  }
 
-    public void incrementPassedCount(long delta) {
-        this.passed.addAndGet(delta);
-    }
+  public void incrementPassedCount(long delta) {
+    this.passed.addAndGet(delta);
+  }
 
 
-    @Override
-    public double getFailRate() {
-        double failed = this.failed.get();
-        double passed = this.passed.get();
-        if(failed == 0.0 && passed == 0) {
-            return 0.0;
-        }
-        return failed / (passed + failed);
+  @Override
+  public double getFailRate() {
+    double failed = this.failed.get();
+    double passed = this.passed.get();
+    if(failed == 0.0 && passed == 0) {
+      return 0.0;
     }
+    return failed / (passed + failed);
+  }
 
-    @Override
-    public long getNumFailed() {
-        return this.failed.get();
-    }
+  @Override
+  public long getNumFailed() {
+    return this.failed.get();
+  }
 
-    @Override
-    public long getNumPassed() {
-        return this.passed.get();
-    }
+  @Override
+  public long getNumPassed() {
+    return this.passed.get();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java
index 7cc8df4..3a318a8 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java
@@ -22,22 +22,22 @@ package org.apache.streams.local.counters;
  */
 public interface DatumStatusCounterMXBean {
 
-    /**
-     * Get number of failed datums
-     * @return number of failed datums
-     */
-    public long getNumFailed();
+  /**
+   * Get number of failed datums
+   * @return number of failed datums
+   */
+  public long getNumFailed();
 
-    /**
-     * Get number of passed datums
-     * @return number of passed datums
-     */
-    public long getNumPassed();
+  /**
+   * Get number of passed datums
+   * @return number of passed datums
+   */
+  public long getNumPassed();
 
-    /**
-     * Get the failure rate.  Calculated by num failed divided by (num passed 
+ num failed)
-     * @return the failure rate
-     */
-    public double getFailRate();
+  /**
+   * Get the failure rate.  Calculated by num failed divided by (num passed + 
num failed)
+   * @return the failure rate
+   */
+  public double getFailRate();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
index 9bd5d49..de37f1b 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
@@ -17,16 +17,14 @@
  */
 package org.apache.streams.local.counters;
 
-import net.jcip.annotations.GuardedBy;
-import net.jcip.annotations.ThreadSafe;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 import org.apache.streams.util.ComponentUtils;
-import org.joda.time.DateTime;
+
+import net.jcip.annotations.GuardedBy;
+import net.jcip.annotations.ThreadSafe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.management.*;
-import java.lang.management.ManagementFactory;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -35,133 +33,133 @@ import java.util.concurrent.atomic.AtomicLong;
 @ThreadSafe
 public class StreamsTaskCounter implements StreamsTaskCounterMXBean {
 
-    public static final String NAME_TEMPLATE = 
"org.apache.streams.local:type=StreamsTaskCounter,name=%s,identifier=%s,startedAt=%s";
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamsTaskCounter.class);
-
-    private AtomicLong emitted;
-    private AtomicLong received;
-    private AtomicLong errors;
-    private AtomicLong totalTime;
-    @GuardedBy("this")
-    private volatile long maxTime;
-
-    /**
-     *
-     * @param id
-     */
-    public StreamsTaskCounter(String id) {
-        this(id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
-    }
-
-    /**
-     *
-     * @param id
-     */
-    public StreamsTaskCounter(String id, String streamId, long startedAt) {
-        this.emitted = new AtomicLong(0);
-        this.received = new AtomicLong(0);
-        this.errors = new AtomicLong(0);
-        this.totalTime = new AtomicLong(0);
-        this.maxTime = -1;
-        ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id, 
streamId, startedAt), this);
+  public static final String NAME_TEMPLATE = 
"org.apache.streams.local:type=StreamsTaskCounter,name=%s,identifier=%s,startedAt=%s";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamsTaskCounter.class);
+
+  private AtomicLong emitted;
+  private AtomicLong received;
+  private AtomicLong errors;
+  private AtomicLong totalTime;
+  @GuardedBy("this")
+  private volatile long maxTime;
+
+  /**
+   *
+   * @param id
+   */
+  public StreamsTaskCounter(String id) {
+    this(id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
+  }
+
+  /**
+   *
+   * @param id
+   */
+  public StreamsTaskCounter(String id, String streamId, long startedAt) {
+    this.emitted = new AtomicLong(0);
+    this.received = new AtomicLong(0);
+    this.errors = new AtomicLong(0);
+    this.totalTime = new AtomicLong(0);
+    this.maxTime = -1;
+    ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id, 
streamId, startedAt), this);
+  }
+
+  /**
+   * Increment emitted count
+   */
+  public void incrementEmittedCount() {
+    this.incrementEmittedCount(1);
+  }
+
+  /**
+   * Increment emitted count
+   * @param delta
+   */
+  public void incrementEmittedCount(long delta) {
+    this.emitted.addAndGet(delta);
+  }
+
+  /**
+   * Increment error count
+   */
+  public void incrementErrorCount() {
+    this.incrementErrorCount(1);
+  }
+
+  /**
+   * Increment error count
+   * @param delta
+   */
+  public void incrementErrorCount(long delta) {
+    this.errors.addAndGet(delta);
+  }
+
+  /**
+   * Increment received count
+   */
+  public void incrementReceivedCount() {
+    this.incrementReceivedCount(1);
+  }
+
+  /**
+   * Increment received count
+   * @param delta
+   */
+  public void incrementReceivedCount(long delta) {
+    this.received.addAndGet(delta);
+  }
+
+  /**
+   * Add the time it takes to process a single datum in milliseconds
+   * @param processTime
+   */
+  public void addTime(long processTime) {
+    synchronized (this) {
+      if(processTime > this.maxTime) {
+        this.maxTime = processTime;
+      }
     }
+    this.totalTime.addAndGet(processTime);
+  }
 
-    /**
-     * Increment emitted count
-     */
-    public void incrementEmittedCount() {
-        this.incrementEmittedCount(1);
+  @Override
+  public double getErrorRate() {
+    if(this.received.get() == 0) {
+      return 0.0;
     }
-
-    /**
-     * Increment emitted count
-     * @param delta
-     */
-    public void incrementEmittedCount(long delta) {
-        this.emitted.addAndGet(delta);
+    return (double) this.errors.get() / (double) this.received.get();
+  }
+
+  @Override
+  public long getNumEmitted() {
+    return this.emitted.get();
+  }
+
+  @Override
+  public long getNumReceived() {
+    return this.received.get();
+  }
+
+  @Override
+  public long getNumUnhandledErrors() {
+    return this.errors.get();
+  }
+
+  @Override
+  public double getAvgTime() {
+    long rec = this.received.get();
+    long emit = this.emitted.get();
+    if(rec == 0 && emit == 0 ) {
+      return 0.0;
+    } else if( rec == 0) { //provider instance
+      return this.totalTime.get() / (double) emit;
+    } else {
+      return this.totalTime.get() / ((double) this.received.get() - 
this.errors.get());
     }
+  }
 
-    /**
-     * Increment error count
-     */
-    public void incrementErrorCount() {
-        this.incrementErrorCount(1);
-    }
-
-    /**
-     * Increment error count
-     * @param delta
-     */
-    public void incrementErrorCount(long delta) {
-        this.errors.addAndGet(delta);
-    }
-
-    /**
-     * Increment received count
-     */
-    public void incrementReceivedCount() {
-        this.incrementReceivedCount(1);
-    }
-
-    /**
-     * Increment received count
-     * @param delta
-     */
-    public void incrementReceivedCount(long delta) {
-        this.received.addAndGet(delta);
-    }
-
-    /**
-     * Add the time it takes to process a single datum in milliseconds
-     * @param processTime
-     */
-    public void addTime(long processTime) {
-        synchronized (this) {
-            if(processTime > this.maxTime) {
-                this.maxTime = processTime;
-            }
-        }
-        this.totalTime.addAndGet(processTime);
-    }
-
-    @Override
-    public double getErrorRate() {
-        if(this.received.get() == 0) {
-            return 0.0;
-        }
-        return (double) this.errors.get() / (double) this.received.get();
-    }
-
-    @Override
-    public long getNumEmitted() {
-        return this.emitted.get();
-    }
-
-    @Override
-    public long getNumReceived() {
-        return this.received.get();
-    }
-
-    @Override
-    public long getNumUnhandledErrors() {
-        return this.errors.get();
-    }
-
-    @Override
-    public double getAvgTime() {
-        long rec = this.received.get();
-        long emit = this.emitted.get();
-        if(rec == 0 && emit == 0 ) {
-            return 0.0;
-        } else if( rec == 0) { //provider instance
-            return this.totalTime.get() / (double) emit;
-        } else {
-            return this.totalTime.get() / ((double) this.received.get() - 
this.errors.get());
-        }
-    }
-
-    @Override
-    public long getMaxTime() {
-        return this.maxTime;
-    }
+  @Override
+  public long getMaxTime() {
+    return this.maxTime;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
index 8ac2e33..062eb04 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
@@ -22,42 +22,42 @@ package org.apache.streams.local.counters;
  */
 public interface StreamsTaskCounterMXBean {
 
-    /**
-     * Get the error rate of the streams process calculated by the number of 
errors not handled by the {@link org.apache.streams.local.tasks.StreamsTask}
-     * divided by the number of datums received.
-     * @return error rate
-     */
-    public double getErrorRate();
-
-    /**
-     * Get the number of {@link org.apache.streams.core.StreamsDatum}s emitted 
by the streams process
-     * @return number of emitted datums
-     */
-    public long getNumEmitted();
-
-    /**
-     * Get the number of {@link org.apache.streams.core.StreamsDatum}s 
received by the streams process
-     * @return number of received datums
-     */
-    public long getNumReceived();
-
-    /**
-     * Get the number of errors that the process had to catch because the 
executing Provider/Processor/Writer did not
-     * catch and handle the exception
-     * @return number of handled errors
-     */
-    public long getNumUnhandledErrors();
-
-    /**
-     * Returns the average time in milliseconds it takes the task to 
readCurrent, process, or write to return.
-     * @return
-     */
-    public double getAvgTime();
-
-    /**
-     * Returns the max time in milliseconds it takes the task to readCurrent, 
process, or write to return.
-     * @return
-     */
-    public long getMaxTime();
+  /**
+   * Get the error rate of the streams process calculated by the number of 
errors not handled by the {@link org.apache.streams.local.tasks.StreamsTask}
+   * divided by the number of datums received.
+   * @return error rate
+   */
+  public double getErrorRate();
+
+  /**
+   * Get the number of {@link org.apache.streams.core.StreamsDatum}s emitted 
by the streams process
+   * @return number of emitted datums
+   */
+  public long getNumEmitted();
+
+  /**
+   * Get the number of {@link org.apache.streams.core.StreamsDatum}s received 
by the streams process
+   * @return number of received datums
+   */
+  public long getNumReceived();
+
+  /**
+   * Get the number of errors that the process had to catch because the 
executing Provider/Processor/Writer did not
+   * catch and handle the exception
+   * @return number of handled errors
+   */
+  public long getNumUnhandledErrors();
+
+  /**
+   * Returns the average time in milliseconds it takes the task to 
readCurrent, process, or write to return.
+   * @return
+   */
+  public double getAvgTime();
+
+  /**
+   * Returns the max time in milliseconds it takes the task to readCurrent, 
process, or write to return.
+   * @return
+   */
+  public long getMaxTime();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
index c9cfec4..38bda24 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
@@ -19,10 +19,13 @@
 package org.apache.streams.local.executors;
 
 import org.apache.streams.local.builders.LocalStreamBuilder;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A fixed ThreadPoolExecutor that will shutdown a stream upon a thread ending 
execution due to an unhandled throwable.
@@ -30,35 +33,35 @@ import java.util.concurrent.*;
  */
 public class ShutdownStreamOnUnhandleThrowableThreadPoolExecutor extends 
ThreadPoolExecutor {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.class);
 
-    private LocalStreamBuilder streamBuilder;
-    private volatile boolean isStoped;
+  private LocalStreamBuilder streamBuilder;
+  private volatile boolean isStoped;
 
-    /**
-     * Creates a fixed size thread pool where corePoolSize & maximumPoolSize 
equal numThreads with an unbounded queue.
-     * @param numThreads number of threads in pool
-     * @param streamBuilder streambuilder to call {@link 
org.apache.streams.core.StreamBuilder#stop()} on upon receiving an unhandled 
throwable
-     */
-    public ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(int numThreads, 
LocalStreamBuilder streamBuilder) {
-        super(numThreads, numThreads, 1, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>());
-        this.streamBuilder = streamBuilder;
-        this.isStoped = false;
-    }
+  /**
+   * Creates a fixed size thread pool where corePoolSize & maximumPoolSize 
equal numThreads with an unbounded queue.
+   * @param numThreads number of threads in pool
+   * @param streamBuilder streambuilder to call {@link 
org.apache.streams.core.StreamBuilder#stop()} on upon receiving an unhandled 
throwable
+   */
+  public ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(int numThreads, 
LocalStreamBuilder streamBuilder) {
+    super(numThreads, numThreads, 1, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>());
+    this.streamBuilder = streamBuilder;
+    this.isStoped = false;
+  }
 
-    @Override
-    protected void afterExecute(Runnable r, Throwable t) {
-        if(t != null) {
-            LOGGER.error("Runnable, {}, exited with an unhandled throwable! : 
{}", r.getClass(), t);
-            LOGGER.error("Attempting to shut down stream.");
-            synchronized (this) {
-                if (!this.isStoped) {
-                    this.isStoped = true;
-                    this.streamBuilder.stop();
-                }
-            }
-        } else {
-            LOGGER.trace("Runnable, {}, finished executing.", r.getClass());
+  @Override
+  protected void afterExecute(Runnable r, Throwable t) {
+    if(t != null) {
+      LOGGER.error("Runnable, {}, exited with an unhandled throwable! : {}", 
r.getClass(), t);
+      LOGGER.error("Attempting to shut down stream.");
+      synchronized (this) {
+        if (!this.isStoped) {
+          this.isStoped = true;
+          this.streamBuilder.stop();
         }
+      }
+    } else {
+      LOGGER.trace("Runnable, {}, finished executing.", r.getClass());
     }
+  }
 }

Reply via email to