Removed use of depricated monitors
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/29d0a189 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/29d0a189 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/29d0a189 Branch: refs/heads/master Commit: 29d0a1897870a75bff59f31a5db52f474bfe8aaf Parents: ce67d58 Author: Ryan Ebanks <[email protected]> Authored: Mon Dec 8 16:58:21 2014 -0600 Committer: Ryan Ebanks <[email protected]> Committed: Mon Dec 8 16:58:21 2014 -0600 ---------------------------------------------------------------------- .../local/builders/LocalStreamBuilder.java | 23 ++++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/29d0a189/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java index 7938247..370d37f 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java @@ -69,6 +69,7 @@ public class LocalStreamBuilder implements StreamBuilder { private int maxQueueCapacity; private String streamIdentifier = DEFAULT_STREAM_IDENTIFIER; private DateTime startedAt = new DateTime(); + private boolean useDepricatedMonitors; /** * Creates a local stream builder with no config object and default maximum internal queue size of 500 @@ -120,18 +121,22 @@ public class LocalStreamBuilder implements StreamBuilder { if(this.streamConfig != null) { this.streamConfig.put(DEFAULT_STARTED_AT_KEY, startedAt.getMillis()); } - + this.useDepricatedMonitors = false; this.broadcastMonitor = new BroadcastMonitorThread(this.streamConfig); this.futures = new HashMap<>(); } + public void setUseDepricatedMonitors(boolean useDepricatedMonitors) { + this.useDepricatedMonitors = useDepricatedMonitors; + } + @Override public StreamBuilder newPerpetualStream(String id, StreamsProvider provider) { validateId(id); this.providers.put(id, new StreamComponent(id, provider, true, streamConfig)); ++this.totalTasks; - if( provider instanceof DatumStatusCountable ) + if(this.useDepricatedMonitors && provider instanceof DatumStatusCountable ) ++this.monitorTasks; return this; } @@ -141,7 +146,7 @@ public class LocalStreamBuilder implements StreamBuilder { validateId(id); this.providers.put(id, new StreamComponent(id, provider, false, streamConfig)); ++this.totalTasks; - if( provider instanceof DatumStatusCountable ) + if(this.useDepricatedMonitors && provider instanceof DatumStatusCountable ) ++this.monitorTasks; return this; } @@ -151,7 +156,7 @@ public class LocalStreamBuilder implements StreamBuilder { validateId(id); this.providers.put(id, new StreamComponent(id, provider, sequence, streamConfig)); ++this.totalTasks; - if( provider instanceof DatumStatusCountable ) + if(this.useDepricatedMonitors && provider instanceof DatumStatusCountable ) ++this.monitorTasks; return this; } @@ -161,7 +166,7 @@ public class LocalStreamBuilder implements StreamBuilder { validateId(id); this.providers.put(id, new StreamComponent(id, provider, start, end, streamConfig)); ++this.totalTasks; - if( provider instanceof DatumStatusCountable ) + if(this.useDepricatedMonitors && provider instanceof DatumStatusCountable ) ++this.monitorTasks; return this; } @@ -173,7 +178,7 @@ public class LocalStreamBuilder implements StreamBuilder { this.components.put(id, comp); connectToOtherComponents(inBoundIds, comp); this.totalTasks += numTasks; - if( processor instanceof DatumStatusCountable ) + if(this.useDepricatedMonitors && processor instanceof DatumStatusCountable ) ++this.monitorTasks; return this; } @@ -185,7 +190,7 @@ public class LocalStreamBuilder implements StreamBuilder { this.components.put(id, comp); connectToOtherComponents(inBoundIds, comp); this.totalTasks += numTasks; - if( writer instanceof DatumStatusCountable ) + if(this.useDepricatedMonitors && writer instanceof DatumStatusCountable ) ++this.monitorTasks; return this; } @@ -296,7 +301,7 @@ public class LocalStreamBuilder implements StreamBuilder { task.setStreamsTaskCounter(counter); this.executor.submit(task); provTasks.put(prov.getId(), (StreamsProviderTask) task); - if( prov.isOperationCountable() ) { + if(this.useDepricatedMonitors && prov.isOperationCountable() ) { this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) prov.getOperation(), 10)); this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); } @@ -314,7 +319,7 @@ public class LocalStreamBuilder implements StreamBuilder { task.setStreamConfig(this.streamConfig); this.futures.put(task, this.executor.submit(task)); compTasks.add(task); - if( comp.isOperationCountable() ) { + if(this.useDepricatedMonitors && comp.isOperationCountable() ) { this.monitor.submit(broadcastMonitor); this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10)); this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10));
