Repository: incubator-streams Updated Branches: refs/heads/master 9b3227141 -> eec3aa997
Depricated printable monitoring solution, and turned it off by default Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3d547839 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3d547839 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3d547839 Branch: refs/heads/master Commit: 3d5478393acce626ca7482fbd27e2b75e74d8799 Parents: c030d43 Author: Ryan Ebanks <[email protected]> Authored: Fri Oct 17 17:36:00 2014 -0500 Committer: Ryan Ebanks <[email protected]> Committed: Fri Oct 17 17:36:00 2014 -0500 ---------------------------------------------------------------------- .../local/builders/LocalStreamBuilder.java | 51 ++++++++++++++++---- .../streams/local/builders/StreamComponent.java | 1 + .../tasks/LocalStreamProcessMonitorThread.java | 1 + .../tasks/StatusCounterMonitorRunnable.java | 1 + .../local/tasks/StatusCounterMonitorThread.java | 1 + 5 files changed, 45 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d547839/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java index bec1ff9..020dbfb 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java @@ -57,6 +57,7 @@ public class LocalStreamBuilder implements StreamBuilder { private LocalStreamProcessMonitorThread monitorThread; private Map<String, List<StreamsTask>> tasks; private Thread shutdownHook; + private boolean useDepricatedMonitoring; /** * @@ -101,6 +102,32 @@ public class LocalStreamBuilder implements StreamBuilder { self.stopInternal(true); } }; + this.useDepricatedMonitoring = false; + } + + /** + * Enables the deprecated monitoring that uses extra threads to print to the console. + * + * Current monitoring uses MXBeans to publish information to be viewable through JMX. + * + * THIS MUST SET BEFORE ANY PROVIDERS/PROCESSORS/WRITERS ARE SET. + * + * CORRECT USAGE: + * localStreamsBuilder.usedDepricatedMonitoring(true); + * localStreamsBuilder.newPerpetual.... + * localStreamsBuilder.add....... + * localStreamsBuilder.start(); + * + * INCORRECT: + * localStreamsBuilder.newPerpetual.... + * localStreamsBuilder.add....... + * localStreamsBuilder.usedDepricatedMonitoring(true); + * localStreamsBuilder.start(); + * + * @param use true to used depricated monitoring. + */ + public void useDepricatedMonitoring(boolean use) { + this.useDepricatedMonitoring = true; } @Override @@ -108,7 +135,7 @@ public class LocalStreamBuilder implements StreamBuilder { validateId(id); this.providers.put(id, new StreamComponent(id, provider, true)); ++this.totalTasks; - if( provider instanceof DatumStatusCountable ) + if(this.useDepricatedMonitoring && provider instanceof DatumStatusCountable ) ++this.monitorTasks; return this; } @@ -118,7 +145,7 @@ public class LocalStreamBuilder implements StreamBuilder { validateId(id); this.providers.put(id, new StreamComponent(id, provider, false)); ++this.totalTasks; - if( provider instanceof DatumStatusCountable ) + if(this.useDepricatedMonitoring && provider instanceof DatumStatusCountable ) ++this.monitorTasks; return this; } @@ -128,7 +155,7 @@ public class LocalStreamBuilder implements StreamBuilder { validateId(id); this.providers.put(id, new StreamComponent(id, provider, sequence)); ++this.totalTasks; - if( provider instanceof DatumStatusCountable ) + if(this.useDepricatedMonitoring && provider instanceof DatumStatusCountable ) ++this.monitorTasks; return this; } @@ -138,7 +165,7 @@ public class LocalStreamBuilder implements StreamBuilder { validateId(id); this.providers.put(id, new StreamComponent(id, provider, start, end)); ++this.totalTasks; - if( provider instanceof DatumStatusCountable ) + if(this.useDepricatedMonitoring && provider instanceof DatumStatusCountable ) ++this.monitorTasks; return this; } @@ -150,7 +177,7 @@ public class LocalStreamBuilder implements StreamBuilder { this.components.put(id, comp); connectToOtherComponents(inBoundIds, comp); this.totalTasks += numTasks; - if( processor instanceof DatumStatusCountable ) + if(this.useDepricatedMonitoring && processor instanceof DatumStatusCountable ) ++this.monitorTasks; return this; } @@ -162,7 +189,7 @@ public class LocalStreamBuilder implements StreamBuilder { this.components.put(id, comp); connectToOtherComponents(inBoundIds, comp); this.totalTasks += numTasks; - if( writer instanceof DatumStatusCountable ) + if(this.useDepricatedMonitoring && writer instanceof DatumStatusCountable ) ++this.monitorTasks; return this; } @@ -175,11 +202,15 @@ public class LocalStreamBuilder implements StreamBuilder { attachShutdownHandler(); boolean isRunning = true; this.executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this); - this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1); + if(this.useDepricatedMonitoring) { + this.monitor = Executors.newFixedThreadPool(this.monitorTasks + 1); + } Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>(); tasks = new HashMap<String, List<StreamsTask>>(); try { - monitorThread = new LocalStreamProcessMonitorThread(executor, 10); + if(this.useDepricatedMonitoring ) { + monitorThread = new LocalStreamProcessMonitorThread(executor, 10); + } this.monitor.submit(monitorThread); setupComponentTasks(tasks); setupProviderTasks(provTasks); @@ -264,7 +295,7 @@ public class LocalStreamBuilder implements StreamBuilder { task.setStreamConfig(this.streamConfig); this.executor.submit(task); provTasks.put(prov.getId(), (StreamsProviderTask) task); - if( prov.isOperationCountable() ) { + if(this.useDepricatedMonitoring && prov.isOperationCountable() ) { this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) prov.getOperation(), 10)); this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); } @@ -280,7 +311,7 @@ public class LocalStreamBuilder implements StreamBuilder { task.setStreamConfig(this.streamConfig); this.executor.submit(task); compTasks.add(task); - if( comp.isOperationCountable() ) { + if(this.useDepricatedMonitoring && comp.isOperationCountable() ) { this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10)); this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d547839/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java index 8611bbb..f824bff 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java @@ -271,6 +271,7 @@ public class StreamComponent { } } + @Deprecated protected boolean isOperationCountable() { return getOperation() instanceof DatumStatusCountable; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d547839/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java index 562d17d..e93ee1d 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java @@ -25,6 +25,7 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; import java.util.concurrent.Executor; +@Deprecated public class LocalStreamProcessMonitorThread implements StatusCounterMonitorRunnable { private static final Logger LOGGER = LoggerFactory.getLogger(LocalStreamProcessMonitorThread.class); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d547839/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java index b431216..5d4d8b5 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java @@ -18,6 +18,7 @@ package org.apache.streams.local.tasks; +@Deprecated public interface StatusCounterMonitorRunnable extends Runnable { void shutdown(); boolean isRunning(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d547839/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java index 7a4bb12..c5413db 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java @@ -22,6 +22,7 @@ import org.apache.streams.core.DatumStatusCountable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@Deprecated public class StatusCounterMonitorThread implements StatusCounterMonitorRunnable { private static final Logger LOGGER = LoggerFactory.getLogger(StatusCounterMonitorThread.class);
