Repository: airavata
Updated Branches:
  refs/heads/lahiru/AIRAVATA-2107 97247e39a -> b220509ca


Use MinMaxCounters and add more metrics to measure threadpool sizes


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b220509c
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b220509c
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b220509c

Branch: refs/heads/lahiru/AIRAVATA-2107
Commit: b220509caf7a1a060617d6662ad063c17fea678c
Parents: 97247e3
Author: Lahiru Ginnaliya Gamathige <[email protected]>
Authored: Sat Oct 1 19:14:16 2016 -0700
Committer: Lahiru Ginnaliya Gamathige <[email protected]>
Committed: Sat Oct 1 19:14:16 2016 -0700

----------------------------------------------------------------------
 .../server/handler/AiravataServerHandler.java   |  8 +++---
 .../gfac/core/GFacThreadPoolExecutor.java       | 11 +++++++-
 .../airavata/gfac/impl/HPCRemoteCluster.java    | 16 ++++++------
 .../org/apache/airavata/gfac/impl/SSHUtils.java | 10 ++++----
 .../gfac/monitor/email/EmailBasedMonitor.java   | 23 +++++++++++------
 .../airavata/gfac/server/GfacServerHandler.java | 23 ++++++++++++++---
 .../core/impl/GFACPassiveJobSubmitter.java      |  2 +-
 .../server/OrchestratorServerHandler.java       | 27 ++++++++++++--------
 .../OrchestratorServerThreadPoolExecutor.java   | 21 ++++++++++++---
 .../org/apache/airavata/server/ServerMain.java  |  1 +
 10 files changed, 96 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
----------------------------------------------------------------------
diff --git 
a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
 
b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index b9b6d03..fe076ee 100644
--- 
a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ 
b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -22,7 +22,7 @@
 package org.apache.airavata.api.server.handler;
 
 import kamon.Kamon;
-import kamon.metric.instrument.Counter;
+import kamon.metric.instrument.MinMaxCounter;
 import org.apache.airavata.api.Airavata;
 import org.apache.airavata.api.airavata_apiConstants;
 import org.apache.airavata.api.server.security.interceptor.SecurityCheck;
@@ -93,9 +93,9 @@ public class AiravataServerHandler implements Airavata.Iface {
     private Publisher statusPublisher;
     private Publisher experimentPublisher;
     private CredentialStoreService.Client csClient;
-    private Counter experimentPublishCount = 
Kamon.metrics().counter(String.format("%s.experiment.publish-count", 
getClass().getCanonicalName()));
-    private Counter experimentLaunchPublishCount = 
Kamon.metrics().counter(String.format("%s.experiment_launch.publish-count", 
getClass().getCanonicalName()));
-    private Counter experimentCancelPublishCount = 
Kamon.metrics().counter(String.format("%s.experiment_cancel.publish-count", 
getClass().getCanonicalName()));
+    private MinMaxCounter experimentPublishCount = 
Kamon.metrics().minMaxCounter(String.format("%s.experiment.publish-count", 
getClass().getName()));
+    private MinMaxCounter experimentLaunchPublishCount = 
Kamon.metrics().minMaxCounter(String.format("%s.experiment_launch.publish-count",
 getClass().getName()));
+    private MinMaxCounter experimentCancelPublishCount = 
Kamon.metrics().minMaxCounter(String.format("%s.experiment_cancel.publish-count",
 getClass().getName()));
 
     public AiravataServerHandler() {
         try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java
index 19073e6..bf6dca6 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java
@@ -19,6 +19,8 @@
  */
 package org.apache.airavata.gfac.core;
 
+import kamon.Kamon;
+import kamon.metric.instrument.Histogram;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.slf4j.Logger;
@@ -26,12 +28,16 @@ import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
 
 public class GFacThreadPoolExecutor {
     private final static Logger logger = 
LoggerFactory.getLogger(GFacThreadPoolExecutor.class);
     public static final String GFAC_THREAD_POOL_SIZE = "gfac.thread.pool.size";
 
     private static ExecutorService threadPool;
+    private static Histogram threadPoolQueueSize = 
Kamon.metrics().histogram("GFacThreadPoolExecutor.queue-size");
+    private static Histogram threadPoolActiveThreads = 
Kamon.metrics().histogram("GFacThreadPoolExecutor.active-threads");
+
 
     public static ExecutorService getCachedThreadPool() {
         if(threadPool ==null){
@@ -52,5 +58,8 @@ public class GFacThreadPoolExecutor {
         return threadPool;
     }
 
-
+    public static void record() {
+        
threadPoolQueueSize.record(((ThreadPoolExecutor)threadPool).getQueue().size());
+        
threadPoolActiveThreads.record(((ThreadPoolExecutor)threadPool).getActiveCount());
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index 9c97d37..cc320a1 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -26,7 +26,7 @@ import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
 import com.jcraft.jsch.UserInfo;
 import kamon.Kamon;
-import kamon.metric.instrument.Counter;
+import kamon.metric.instrument.MinMaxCounter;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.gfac.core.JobManagerConfiguration;
 import org.apache.airavata.gfac.core.SSHApiException;
@@ -53,13 +53,13 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
        private final SSHKeyAuthentication authentication;
        private final JSch jSch;
        private Session session;
-       private Counter submittedJobCount = 
Kamon.metrics().counter(String.format("%s.submitted-jobs", 
getClass().getCanonicalName()));
-       private Counter nonZeroExitCodeJobCount = 
Kamon.metrics().counter(String.format("%s.nonzero-exit-jobs", 
getClass().getCanonicalName()));
-       private Counter emptyJobIdCount = 
Kamon.metrics().counter(String.format("%s.empty-jobid-jobs", 
getClass().getCanonicalName()));
-       private Counter copyToFailCount = 
Kamon.metrics().counter(String.format("%s.copyTo-fail", 
getClass().getCanonicalName()));
-       private Counter copyFromFailCount = 
Kamon.metrics().counter(String.format("%s.copyFrom-fail", 
getClass().getCanonicalName()));
-       private Counter mkDirFailCount = 
Kamon.metrics().counter(String.format("%s.mkDir-fail", 
getClass().getCanonicalName()));
-       private Counter listFailCount = 
Kamon.metrics().counter(String.format("%s.list-fail", 
getClass().getCanonicalName()));
+       private MinMaxCounter submittedJobCount = 
Kamon.metrics().minMaxCounter(String.format("%s.submitted-jobs", 
getClass().getName()));
+       private MinMaxCounter nonZeroExitCodeJobCount = 
Kamon.metrics().minMaxCounter(String.format("%s.nonzero-exit-jobs", 
getClass().getName()));
+       private MinMaxCounter emptyJobIdCount = 
Kamon.metrics().minMaxCounter(String.format("%s.empty-jobid-jobs", 
getClass().getName()));
+       private MinMaxCounter copyToFailCount = 
Kamon.metrics().minMaxCounter(String.format("%s.copyTo-fail", 
getClass().getName()));
+       private MinMaxCounter copyFromFailCount = 
Kamon.metrics().minMaxCounter(String.format("%s.copyFrom-fail", 
getClass().getName()));
+       private MinMaxCounter mkDirFailCount = 
Kamon.metrics().minMaxCounter(String.format("%s.mkDir-fail", 
getClass().getName()));
+       private MinMaxCounter listFailCount = 
Kamon.metrics().minMaxCounter(String.format("%s.list-fail", 
getClass().getName()));
 
 
        public HPCRemoteCluster(ServerInfo serverInfo, JobManagerConfiguration 
jobManagerConfiguration, AuthenticationInfo

http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
index 2f59828..3f79358 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
@@ -25,7 +25,7 @@ import com.jcraft.jsch.ChannelExec;
 import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
 import kamon.Kamon;
-import kamon.metric.instrument.Counter;
+import kamon.metric.instrument.MinMaxCounter;
 import kamon.metric.instrument.Histogram;
 import org.apache.airavata.gfac.core.SSHApiException;
 import org.slf4j.Logger;
@@ -45,11 +45,11 @@ import java.util.List;
  */
 public class SSHUtils {
        private static final Logger log = 
LoggerFactory.getLogger(SSHUtils.class);
-       private static Histogram scpToBytes = 
Kamon.metrics().histogram(String.format("%s.scpTo-bytes", 
SSHUtils.class.getCanonicalName()));
-       private static Counter scpToFailedCount = 
Kamon.metrics().counter(String.format("%s.scpTo-fail", 
SSHUtils.class.getCanonicalName()));
+       private static Histogram scpToBytes = 
Kamon.metrics().histogram(String.format("%s.scpTo-bytes", 
SSHUtils.class.getName()));
+       private static MinMaxCounter scpToFailedCount = 
Kamon.metrics().minMaxCounter(String.format("%s.scpTo-fail", 
SSHUtils.class.getName()));
 
-       private static Histogram scpFromBytes = 
Kamon.metrics().histogram(String.format("%s.scpFrom-bytes", 
SSHUtils.class.getCanonicalName()));
-       private static Counter scpFromFailedCount = 
Kamon.metrics().counter(String.format("%s.scpFrom-fail", 
SSHUtils.class.getCanonicalName()));
+       private static Histogram scpFromBytes = 
Kamon.metrics().histogram(String.format("%s.scpFrom-bytes", 
SSHUtils.class.getName()));
+       private static MinMaxCounter scpFromFailedCount = 
Kamon.metrics().minMaxCounter(String.format("%s.scpFrom-fail", 
SSHUtils.class.getName()));
 
        /**
         * This will copy a local file to a remote location

http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index d1afdd6..69de6b2 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -21,7 +21,7 @@
 package org.apache.airavata.gfac.monitor.email;
 
 import kamon.Kamon;
-import kamon.metric.instrument.Counter;
+import kamon.metric.instrument.MinMaxCounter;
 import kamon.metric.instrument.Histogram;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.AiravataUtils;
@@ -53,6 +53,7 @@ import javax.mail.search.FlagTerm;
 import javax.mail.search.SearchTerm;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 
 public class EmailBasedMonitor implements JobMonitor, Runnable{
     private static final Logger log = 
LoggerFactory.getLogger(EmailBasedMonitor.class);
@@ -74,10 +75,11 @@ public class EmailBasedMonitor implements JobMonitor, 
Runnable{
        private Message[] flushUnseenMessages;
     private Map<String, Boolean> canceledJobs = new ConcurrentHashMap<>();
     private Timer timer;
-    private Histogram monitorQueueSize = 
Kamon.metrics().histogram(String.format("%s.monitor-queue-size", 
getClass().getCanonicalName()));
-    private Histogram cancelledJobs = 
Kamon.metrics().histogram(String.format("%s.cancelled-jobs", 
getClass().getCanonicalName()));
-    private Counter completedJobCount = 
Kamon.metrics().counter(String.format("%s.completed-jobs", 
getClass().getCanonicalName()));
-    private Counter failedJobCount = 
Kamon.metrics().counter(String.format("%s.failed-jobs", 
getClass().getCanonicalName()));
+    private Histogram monitorQueueSize = 
Kamon.metrics().histogram(String.format("%s.monitor-queue-size", 
getClass().getName()));
+    private Histogram cancelledJobs = 
Kamon.metrics().histogram(String.format("%s.cancelled-jobs", 
getClass().getName()));
+    private MinMaxCounter completedJobCount = 
Kamon.metrics().minMaxCounter(String.format("%s.completed-jobs", 
getClass().getName()));
+    private MinMaxCounter failedJobCount = 
Kamon.metrics().minMaxCounter(String.format("%s.failed-jobs", 
getClass().getName()));
+    private ExecutorService cachedThreadPool;
 
     public EmailBasedMonitor(Map<ResourceJobManagerType, ResourceConfig> 
resourceConfigs) throws AiravataException {
                init();
@@ -99,6 +101,7 @@ public class EmailBasedMonitor implements JobMonitor, 
Runnable{
         timer = new Timer("CancelJobHandler", true);
         long period = 1000 * 60 * 5; // five minute delay between successive 
task executions.
         timer.schedule(new CancelTimerTask(), 0 , period);
+        cachedThreadPool = GFacThreadPoolExecutor.getCachedThreadPool();
     }
 
        private void populateAddressAndParserMap(Map<ResourceJobManagerType, 
ResourceConfig> resourceConfigs) throws AiravataException {
@@ -133,6 +136,7 @@ public class EmailBasedMonitor implements JobMonitor, 
Runnable{
        public void stopMonitor(String jobId, boolean runOutflow) {
                TaskContext taskContext = jobMonitorMap.remove(jobId);
         monitorQueueSize.record(jobMonitorMap.size());
+        GFacThreadPoolExecutor.record();
         if (taskContext != null && runOutflow) {
                        try {
                 ProcessContext pc = taskContext.getParentProcessContext();
@@ -150,12 +154,13 @@ public class EmailBasedMonitor implements JobMonitor, 
Runnable{
                 pStatus.setReason("Job cancelled");
                 pc.setProcessStatus(pStatus);
                 GFacUtils.saveAndPublishProcessStatus(pc);
-                GFacThreadPoolExecutor.getCachedThreadPool().execute(new 
GFacWorker(pc));
+                cachedThreadPool.execute(new GFacWorker(pc));
                        } catch (GFacException e) {
                                log.info("[EJM]: Error while running output 
tasks", e);
                        }
                }
-       }
+        GFacThreadPoolExecutor.record();
+    }
 
     @Override
     public boolean isMonitoring(String jobId) {
@@ -338,6 +343,7 @@ public class EmailBasedMonitor implements JobMonitor, 
Runnable{
     }
 
     private void process(JobStatusResult jobStatusResult, TaskContext 
taskContext){
+        GFacThreadPoolExecutor.record();
         canceledJobs.remove(jobStatusResult.getJobId());
         cancelledJobs.record(canceledJobs.size());
         JobState resultState = jobStatusResult.getState();
@@ -413,11 +419,12 @@ public class EmailBasedMonitor implements JobMonitor, 
Runnable{
                     parentProcessContext.setProcessStatus(processStatus);
                     
GFacUtils.saveAndPublishProcessStatus(parentProcessContext);
                 }
-                       
GFacThreadPoolExecutor.getCachedThreadPool().execute(new 
GFacWorker(parentProcessContext));
+                       cachedThreadPool.execute(new 
GFacWorker(parentProcessContext));
                } catch (GFacException e) {
                        log.info("[EJM]: Error while running output tasks", e);
                }
         }
+        GFacThreadPoolExecutor.record();
     }
 
     private void writeEnvelopeOnError(Message m) throws MessagingException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
 
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 086093c..035543e 100644
--- 
a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ 
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -21,7 +21,8 @@
 package org.apache.airavata.gfac.server;
 
 import kamon.Kamon;
-import kamon.metric.instrument.Counter;
+import kamon.metric.instrument.Histogram;
+import kamon.metric.instrument.MinMaxCounter;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.AiravataStartupException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -62,6 +63,7 @@ import org.apache.thrift.TException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.Stat;
+import org.eclipse.jetty.util.thread.ThreadPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -73,11 +75,13 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static sun.java2d.Disposer.getQueue;
 
 public class GfacServerHandler implements GfacService.Iface {
     private final static Logger log = 
LoggerFactory.getLogger(GfacServerHandler.class);
     private Subscriber processLaunchSubscriber;
-    private static int requestCount=0;
     private ExperimentCatalog experimentCatalog;
     private AppCatalog appCatalog;
     private String airavataUserName;
@@ -87,7 +91,11 @@ public class GfacServerHandler implements GfacService.Iface {
     private BlockingQueue<TaskSubmitEvent> taskSubmitEvents;
     private static List<AbstractActivityListener> activityListeners = new 
ArrayList<AbstractActivityListener>();
     private ExecutorService executorService;
-    private Counter consumedCount = 
Kamon.metrics().counter(String.format("%s.consumed-count", 
getClass().getCanonicalName()));
+    private MinMaxCounter consumedCount = 
Kamon.metrics().minMaxCounter(String.format("%s.consumed-count", 
getClass().getName()));
+    private Histogram threadPoolQueueSize = 
Kamon.metrics().histogram(String.format("%s.queue-size", getClass().getName()));
+    private Histogram threadPoolActiveThreads = 
Kamon.metrics().histogram(String.format("%s.active-threads", 
getClass().getName()));
+    private Histogram threadPoolTotalThreads = 
Kamon.metrics().histogram(String.format("%s.total-threads", 
getClass().getName()));
+
 
     public GfacServerHandler() throws AiravataStartupException {
         try {
@@ -163,7 +171,8 @@ public class GfacServerHandler implements GfacService.Iface 
{
         MDC.put(MDCConstants.GATEWAY_ID, gatewayId);
         MDC.put(MDCConstants.TOKEN_ID, tokenId);
         try {
-               executorService.execute(MDCUtil.wrapWithMDC(new 
GFacWorker(processId, gatewayId, tokenId)));
+            recordThreadPool();
+            executorService.execute(MDCUtil.wrapWithMDC(new 
GFacWorker(processId, gatewayId, tokenId)));
         } catch (GFacException e) {
             log.error("Failed to submit process", e);
 
@@ -174,6 +183,12 @@ public class GfacServerHandler implements 
GfacService.Iface {
            return true;
     }
 
+    private void recordThreadPool() {
+        
threadPoolQueueSize.record(((ThreadPoolExecutor)executorService).getQueue().size());
+        
threadPoolActiveThreads.record(((ThreadPoolExecutor)executorService).getActiveCount());
+        
threadPoolTotalThreads.record(((ThreadPoolExecutor)executorService).getPoolSize());
+    }
+
     @Override
     public boolean cancelProcess(String processId, String gatewayId, String 
tokenId) throws TException {
         return false;

http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index 38f4e97..5d2a353 100644
--- 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -21,7 +21,7 @@
 package org.apache.airavata.orchestrator.core.impl;
 
 import kamon.Kamon;
-import kamon.metric.instrument.Counter;
+import kamon.metric.instrument.MinMaxCounter;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;

http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
 
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index b617016..bde61c9 100644
--- 
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ 
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -22,7 +22,7 @@
 package org.apache.airavata.orchestrator.server;
 
 import kamon.Kamon;
-import kamon.metric.instrument.Counter;
+import kamon.metric.instrument.MinMaxCounter;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.logging.MDCConstants;
@@ -74,6 +74,7 @@ import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
 import java.util.*;
+import java.util.concurrent.ExecutorService;
 
 public class OrchestratorServerHandler implements OrchestratorService.Iface {
        private static Logger log = 
LoggerFactory.getLogger(OrchestratorServerHandler.class);
@@ -87,15 +88,16 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
        private final Subscriber statusSubscribe;
        private final Subscriber experimentSubscriber;
        private CuratorFramework curatorClient;
-    private Counter publishCount = 
Kamon.metrics().counter(String.format("%s.publish-count", 
getClass().getCanonicalName()));
-    private Counter publishFail = 
Kamon.metrics().counter(String.format("%s.publish-fail-count", 
getClass().getCanonicalName()));
-    private Counter processConsumeCount = 
Kamon.metrics().counter(String.format("%s.process.consume-count", 
getClass().getCanonicalName()));
-    private Counter experimentConsumeCount = 
Kamon.metrics().counter(String.format("%s.experiment.consume-count", 
getClass().getCanonicalName()));
-       private Counter experimentLaunchConsumeCount = 
Kamon.metrics().counter(String.format("%s.experiment_launch.consume-count", 
getClass().getCanonicalName()));
-       private Counter experimentCancelConsumeCount = 
Kamon.metrics().counter(String.format("%s.experiment_cancel.consume-count", 
getClass().getCanonicalName()));
-       private Counter unsupportedMessageCount = 
Kamon.metrics().counter(String.format("%s.unsupported-count", 
getClass().getCanonicalName()));
+    private MinMaxCounter publishCount = 
Kamon.metrics().minMaxCounter(String.format("%s.publish-count", 
getClass().getName()));
+    private MinMaxCounter publishFail = 
Kamon.metrics().minMaxCounter(String.format("%s.publish-fail-count", 
getClass().getName()));
+    private MinMaxCounter processConsumeCount = 
Kamon.metrics().minMaxCounter(String.format("%s.process.consume-count", 
getClass().getName()));
+    private MinMaxCounter experimentConsumeCount = 
Kamon.metrics().minMaxCounter(String.format("%s.experiment.consume-count", 
getClass().getName()));
+       private MinMaxCounter experimentLaunchConsumeCount = 
Kamon.metrics().minMaxCounter(String.format("%s.experiment_launch.consume-count",
 getClass().getName()));
+       private MinMaxCounter experimentCancelConsumeCount = 
Kamon.metrics().minMaxCounter(String.format("%s.experiment_cancel.consume-count",
 getClass().getName()));
+       private MinMaxCounter unsupportedMessageCount = 
Kamon.metrics().minMaxCounter(String.format("%s.unsupported-count", 
getClass().getName()));
+       private ExecutorService cachedThreadPool;
 
-    /**
+       /**
         * Query orchestrator server to fetch the CPI version
         */
        public String getOrchestratorCPIVersion() throws TException {
@@ -109,6 +111,7 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
                        
routingKeys.add(ServerSettings.getRabbitmqExperimentLaunchQueueName());
                        experimentSubscriber = 
MessagingFactory.getSubscriber(new ExperimentHandler(), routingKeys, 
Type.EXPERIMENT_LAUNCH);
                        setAiravataUserName(ServerSettings.getDefaultUser());
+                       cachedThreadPool = 
OrchestratorServerThreadPoolExecutor.getCachedThreadPool();
                } catch (AiravataException e) {
             log.error(e.getMessage(), e);
             throw new OrchestratorException("Error while initializing 
orchestrator service", e);
@@ -144,7 +147,8 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
         */
        public boolean launchExperiment(String experimentId, String gatewayId) 
throws TException {
         ExperimentModel experiment = null;
-        try {
+               OrchestratorServerThreadPoolExecutor.record();
+               try {
             String experimentNodePath = GFacUtils.getExperimentNodePath 
(experimentId);
                        
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
experimentNodePath);
                        String experimentCancelNode = 
ZKPaths.makePath(experimentNodePath, 
ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
@@ -237,7 +241,7 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
                 
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                 
OrchestratorUtils.updageAndPublishExperimentStatus(experimentId, status, 
publisher, gatewayId);
                 log.info("expId: {}, Launched experiment ", experimentId);
-                
OrchestratorServerThreadPoolExecutor.getCachedThreadPool().execute(MDCUtil.wrapWithMDC(new
 SingleAppExperimentRunner(experimentId, token, gatewayId)));
+                               
cachedThreadPool.execute(MDCUtil.wrapWithMDC(new 
SingleAppExperimentRunner(experimentId, token, gatewayId)));
             } else if (executionType == ExperimentType.WORKFLOW) {
                 //its a workflow execution experiment
                 log.debug(experimentId, "Launching workflow experiment {}.", 
experimentId);
@@ -249,6 +253,7 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
         } catch (Exception e) {
             throw new TException("Experiment '" + experimentId + "' launch 
failed. Unable to figureout execution type for application " + 
experiment.getExecutionId(), e);
         }
+        OrchestratorServerThreadPoolExecutor.record();
         return true;
        }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorServerThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorServerThreadPoolExecutor.java
 
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorServerThreadPoolExecutor.java
index 3fdba74..a57419c 100644
--- 
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorServerThreadPoolExecutor.java
+++ 
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorServerThreadPoolExecutor.java
@@ -23,20 +23,27 @@ package org.apache.airavata.orchestrator.util;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
 
 
+import kamon.Kamon;
+import kamon.metric.instrument.Histogram;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class OrchestratorServerThreadPoolExecutor {
-           private final static Logger logger = 
LoggerFactory.getLogger(OrchestratorServerThreadPoolExecutor.class);
-           public static final String AIRAVATA_SERVER_THREAD_POOL_SIZE = 
"airavata.server.thread.pool.size";
+       private final static Logger logger = 
LoggerFactory.getLogger(OrchestratorServerThreadPoolExecutor.class);
+       public static final String AIRAVATA_SERVER_THREAD_POOL_SIZE = 
"airavata.server.thread.pool.size";
 
-           private static ExecutorService threadPool;
+       private static ExecutorService threadPool;
+       private static Histogram threadPoolQueueSize = 
Kamon.metrics().histogram("OrchestratorServerThreadPoolExecutor.queue-size");
+       private static Histogram threadPoolActiveThreads = 
Kamon.metrics().histogram("OrchestratorServerThreadPoolExecutor.active-threads");
+       private static Histogram threadPoolTotalThreads = 
Kamon.metrics().histogram("OrchestratorServerThreadPoolExecutor.total-threads");
 
-           public static ExecutorService getCachedThreadPool() {
+
+       public static ExecutorService getCachedThreadPool() {
                if(threadPool ==null){
                    threadPool = Executors.newCachedThreadPool();
                }
@@ -53,4 +60,10 @@ public class OrchestratorServerThreadPoolExecutor {
                }
                return threadPool;
            }
+
+       public static void record() {
+               
threadPoolQueueSize.record(((ThreadPoolExecutor)threadPool).getQueue().size());
+               
threadPoolActiveThreads.record(((ThreadPoolExecutor)threadPool).getActiveCount());
+               
threadPoolTotalThreads.record(((ThreadPoolExecutor)threadPool).getPoolSize());
+       }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
----------------------------------------------------------------------
diff --git 
a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java 
b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
index 8660974..7bce436 100644
--- a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
+++ b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
@@ -222,6 +222,7 @@ public class ServerMain {
                        }
                }
                if (hasStopRequested()){
+                       Kamon.shutdown();
             ServerSettings.setStopAllThreads(true);
                        stopAllServers();
                        ShutdownStrategy shutdownStrategy;

Reply via email to