Repository: incubator-streams
Updated Branches:
  refs/heads/master 9b3227141 -> eec3aa997


Depricated printable monitoring solution, and turned it off by default


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3d547839
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3d547839
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3d547839

Branch: refs/heads/master
Commit: 3d5478393acce626ca7482fbd27e2b75e74d8799
Parents: c030d43
Author: Ryan Ebanks <[email protected]>
Authored: Fri Oct 17 17:36:00 2014 -0500
Committer: Ryan Ebanks <[email protected]>
Committed: Fri Oct 17 17:36:00 2014 -0500

----------------------------------------------------------------------
 .../local/builders/LocalStreamBuilder.java      | 51 ++++++++++++++++----
 .../streams/local/builders/StreamComponent.java |  1 +
 .../tasks/LocalStreamProcessMonitorThread.java  |  1 +
 .../tasks/StatusCounterMonitorRunnable.java     |  1 +
 .../local/tasks/StatusCounterMonitorThread.java |  1 +
 5 files changed, 45 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d547839/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index bec1ff9..020dbfb 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -57,6 +57,7 @@ public class LocalStreamBuilder implements StreamBuilder {
     private LocalStreamProcessMonitorThread monitorThread;
     private Map<String, List<StreamsTask>> tasks;
     private Thread shutdownHook;
+    private boolean useDepricatedMonitoring;
 
     /**
      *
@@ -101,6 +102,32 @@ public class LocalStreamBuilder implements StreamBuilder {
                 self.stopInternal(true);
             }
         };
+        this.useDepricatedMonitoring = false;
+    }
+
+    /**
+     * Enables the deprecated monitoring that uses extra threads to print to 
the console.
+     *
+     * Current monitoring uses MXBeans to publish information to be viewable 
through JMX.
+     *
+     * THIS MUST SET BEFORE ANY PROVIDERS/PROCESSORS/WRITERS ARE SET.
+     *
+     * CORRECT USAGE:
+     * localStreamsBuilder.usedDepricatedMonitoring(true);
+     * localStreamsBuilder.newPerpetual....
+     * localStreamsBuilder.add.......
+     * localStreamsBuilder.start();
+     *
+     * INCORRECT:
+     * localStreamsBuilder.newPerpetual....
+     * localStreamsBuilder.add.......
+     * localStreamsBuilder.usedDepricatedMonitoring(true);
+     * localStreamsBuilder.start();
+     *
+     * @param use true to used depricated monitoring.
+     */
+    public void useDepricatedMonitoring(boolean use) {
+        this.useDepricatedMonitoring = true;
     }
 
     @Override
@@ -108,7 +135,7 @@ public class LocalStreamBuilder implements StreamBuilder {
         validateId(id);
         this.providers.put(id, new StreamComponent(id, provider, true));
         ++this.totalTasks;
-        if( provider instanceof DatumStatusCountable )
+        if(this.useDepricatedMonitoring && provider instanceof 
DatumStatusCountable )
             ++this.monitorTasks;
         return this;
     }
@@ -118,7 +145,7 @@ public class LocalStreamBuilder implements StreamBuilder {
         validateId(id);
         this.providers.put(id, new StreamComponent(id, provider, false));
         ++this.totalTasks;
-        if( provider instanceof DatumStatusCountable )
+        if(this.useDepricatedMonitoring && provider instanceof 
DatumStatusCountable )
             ++this.monitorTasks;
         return this;
     }
@@ -128,7 +155,7 @@ public class LocalStreamBuilder implements StreamBuilder {
         validateId(id);
         this.providers.put(id, new StreamComponent(id, provider, sequence));
         ++this.totalTasks;
-        if( provider instanceof DatumStatusCountable )
+        if(this.useDepricatedMonitoring && provider instanceof 
DatumStatusCountable )
             ++this.monitorTasks;
         return this;
     }
@@ -138,7 +165,7 @@ public class LocalStreamBuilder implements StreamBuilder {
         validateId(id);
         this.providers.put(id, new StreamComponent(id, provider, start, end));
         ++this.totalTasks;
-        if( provider instanceof DatumStatusCountable )
+        if(this.useDepricatedMonitoring && provider instanceof 
DatumStatusCountable )
             ++this.monitorTasks;
         return this;
     }
@@ -150,7 +177,7 @@ public class LocalStreamBuilder implements StreamBuilder {
         this.components.put(id, comp);
         connectToOtherComponents(inBoundIds, comp);
         this.totalTasks += numTasks;
-        if( processor instanceof DatumStatusCountable )
+        if(this.useDepricatedMonitoring && processor instanceof 
DatumStatusCountable )
             ++this.monitorTasks;
         return this;
     }
@@ -162,7 +189,7 @@ public class LocalStreamBuilder implements StreamBuilder {
         this.components.put(id, comp);
         connectToOtherComponents(inBoundIds, comp);
         this.totalTasks += numTasks;
-        if( writer instanceof DatumStatusCountable )
+        if(this.useDepricatedMonitoring && writer instanceof 
DatumStatusCountable )
             ++this.monitorTasks;
         return this;
     }
@@ -175,11 +202,15 @@ public class LocalStreamBuilder implements StreamBuilder {
         attachShutdownHandler();
         boolean isRunning = true;
         this.executor = new 
ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this);
-        this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1);
+        if(this.useDepricatedMonitoring) {
+            this.monitor = Executors.newFixedThreadPool(this.monitorTasks + 1);
+        }
         Map<String, StreamsProviderTask> provTasks = new HashMap<String, 
StreamsProviderTask>();
         tasks = new HashMap<String, List<StreamsTask>>();
         try {
-            monitorThread = new LocalStreamProcessMonitorThread(executor, 10);
+            if(this.useDepricatedMonitoring ) {
+                monitorThread = new LocalStreamProcessMonitorThread(executor, 
10);
+            }
             this.monitor.submit(monitorThread);
             setupComponentTasks(tasks);
             setupProviderTasks(provTasks);
@@ -264,7 +295,7 @@ public class LocalStreamBuilder implements StreamBuilder {
             task.setStreamConfig(this.streamConfig);
             this.executor.submit(task);
             provTasks.put(prov.getId(), (StreamsProviderTask) task);
-            if( prov.isOperationCountable() ) {
+            if(this.useDepricatedMonitoring && prov.isOperationCountable() ) {
                 this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) prov.getOperation(), 10));
                 this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) task, 10));
             }
@@ -280,7 +311,7 @@ public class LocalStreamBuilder implements StreamBuilder {
                 task.setStreamConfig(this.streamConfig);
                 this.executor.submit(task);
                 compTasks.add(task);
-                if( comp.isOperationCountable() ) {
+                if(this.useDepricatedMonitoring && comp.isOperationCountable() 
) {
                     this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10));
                     this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) task, 10));
                 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d547839/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
index 8611bbb..f824bff 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
@@ -271,6 +271,7 @@ public class StreamComponent {
         }
     }
 
+    @Deprecated
     protected boolean isOperationCountable() {
         return getOperation() instanceof DatumStatusCountable;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d547839/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
index 562d17d..e93ee1d 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java
@@ -25,6 +25,7 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryUsage;
 import java.util.concurrent.Executor;
 
+@Deprecated
 public class LocalStreamProcessMonitorThread implements 
StatusCounterMonitorRunnable
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LocalStreamProcessMonitorThread.class);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d547839/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java
index b431216..5d4d8b5 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java
@@ -18,6 +18,7 @@
 
 package org.apache.streams.local.tasks;
 
+@Deprecated
 public interface StatusCounterMonitorRunnable extends Runnable {
     void shutdown();
     boolean isRunning();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d547839/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
index 7a4bb12..c5413db 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java
@@ -22,6 +22,7 @@ import org.apache.streams.core.DatumStatusCountable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@Deprecated
 public class StatusCounterMonitorThread implements 
StatusCounterMonitorRunnable {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(StatusCounterMonitorThread.class);
 

Reply via email to