Removed use of depricated monitors

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/29d0a189
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/29d0a189
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/29d0a189

Branch: refs/heads/master
Commit: 29d0a1897870a75bff59f31a5db52f474bfe8aaf
Parents: ce67d58
Author: Ryan Ebanks <[email protected]>
Authored: Mon Dec 8 16:58:21 2014 -0600
Committer: Ryan Ebanks <[email protected]>
Committed: Mon Dec 8 16:58:21 2014 -0600

----------------------------------------------------------------------
 .../local/builders/LocalStreamBuilder.java      | 23 ++++++++++++--------
 1 file changed, 14 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/29d0a189/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 7938247..370d37f 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -69,6 +69,7 @@ public class LocalStreamBuilder implements StreamBuilder {
     private int maxQueueCapacity;
     private String streamIdentifier = DEFAULT_STREAM_IDENTIFIER;
     private DateTime startedAt = new DateTime();
+    private boolean useDepricatedMonitors;
 
     /**
      * Creates a local stream builder with no config object and default 
maximum internal queue size of 500
@@ -120,18 +121,22 @@ public class LocalStreamBuilder implements StreamBuilder {
         if(this.streamConfig != null) {
             this.streamConfig.put(DEFAULT_STARTED_AT_KEY, 
startedAt.getMillis());
         }
-
+        this.useDepricatedMonitors = false;
         this.broadcastMonitor = new BroadcastMonitorThread(this.streamConfig);
 
         this.futures = new HashMap<>();
     }
 
+    public void setUseDepricatedMonitors(boolean useDepricatedMonitors) {
+        this.useDepricatedMonitors = useDepricatedMonitors;
+    }
+
     @Override
     public StreamBuilder newPerpetualStream(String id, StreamsProvider 
provider) {
         validateId(id);
         this.providers.put(id, new StreamComponent(id, provider, true, 
streamConfig));
         ++this.totalTasks;
-        if( provider instanceof DatumStatusCountable )
+        if(this.useDepricatedMonitors && provider instanceof 
DatumStatusCountable )
             ++this.monitorTasks;
         return this;
     }
@@ -141,7 +146,7 @@ public class LocalStreamBuilder implements StreamBuilder {
         validateId(id);
         this.providers.put(id, new StreamComponent(id, provider, false, 
streamConfig));
         ++this.totalTasks;
-        if( provider instanceof DatumStatusCountable )
+        if(this.useDepricatedMonitors && provider instanceof 
DatumStatusCountable )
             ++this.monitorTasks;
         return this;
     }
@@ -151,7 +156,7 @@ public class LocalStreamBuilder implements StreamBuilder {
         validateId(id);
         this.providers.put(id, new StreamComponent(id, provider, sequence, 
streamConfig));
         ++this.totalTasks;
-        if( provider instanceof DatumStatusCountable )
+        if(this.useDepricatedMonitors && provider instanceof 
DatumStatusCountable )
             ++this.monitorTasks;
         return this;
     }
@@ -161,7 +166,7 @@ public class LocalStreamBuilder implements StreamBuilder {
         validateId(id);
         this.providers.put(id, new StreamComponent(id, provider, start, end, 
streamConfig));
         ++this.totalTasks;
-        if( provider instanceof DatumStatusCountable )
+        if(this.useDepricatedMonitors && provider instanceof 
DatumStatusCountable )
             ++this.monitorTasks;
         return this;
     }
@@ -173,7 +178,7 @@ public class LocalStreamBuilder implements StreamBuilder {
         this.components.put(id, comp);
         connectToOtherComponents(inBoundIds, comp);
         this.totalTasks += numTasks;
-        if( processor instanceof DatumStatusCountable )
+        if(this.useDepricatedMonitors && processor instanceof 
DatumStatusCountable )
             ++this.monitorTasks;
         return this;
     }
@@ -185,7 +190,7 @@ public class LocalStreamBuilder implements StreamBuilder {
         this.components.put(id, comp);
         connectToOtherComponents(inBoundIds, comp);
         this.totalTasks += numTasks;
-        if( writer instanceof DatumStatusCountable )
+        if(this.useDepricatedMonitors && writer instanceof 
DatumStatusCountable )
             ++this.monitorTasks;
         return this;
     }
@@ -296,7 +301,7 @@ public class LocalStreamBuilder implements StreamBuilder {
             task.setStreamsTaskCounter(counter);
             this.executor.submit(task);
             provTasks.put(prov.getId(), (StreamsProviderTask) task);
-            if( prov.isOperationCountable() ) {
+            if(this.useDepricatedMonitors && prov.isOperationCountable() ) {
                 this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) prov.getOperation(), 10));
                 this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) task, 10));
             }
@@ -314,7 +319,7 @@ public class LocalStreamBuilder implements StreamBuilder {
                 task.setStreamConfig(this.streamConfig);
                 this.futures.put(task, this.executor.submit(task));
                 compTasks.add(task);
-                if( comp.isOperationCountable() ) {
+                if(this.useDepricatedMonitors &&  comp.isOperationCountable() 
) {
                     this.monitor.submit(broadcastMonitor);
                     this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10));
                     this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) task, 10));

Reply via email to