Repository: airavata Updated Branches: refs/heads/lahiru/AIRAVATA-2107 97247e39a -> b220509ca
Use MinMaxCounters and add more metrics to measure threadpool sizes Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b220509c Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b220509c Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b220509c Branch: refs/heads/lahiru/AIRAVATA-2107 Commit: b220509caf7a1a060617d6662ad063c17fea678c Parents: 97247e3 Author: Lahiru Ginnaliya Gamathige <[email protected]> Authored: Sat Oct 1 19:14:16 2016 -0700 Committer: Lahiru Ginnaliya Gamathige <[email protected]> Committed: Sat Oct 1 19:14:16 2016 -0700 ---------------------------------------------------------------------- .../server/handler/AiravataServerHandler.java | 8 +++--- .../gfac/core/GFacThreadPoolExecutor.java | 11 +++++++- .../airavata/gfac/impl/HPCRemoteCluster.java | 16 ++++++------ .../org/apache/airavata/gfac/impl/SSHUtils.java | 10 ++++---- .../gfac/monitor/email/EmailBasedMonitor.java | 23 +++++++++++------ .../airavata/gfac/server/GfacServerHandler.java | 23 ++++++++++++++--- .../core/impl/GFACPassiveJobSubmitter.java | 2 +- .../server/OrchestratorServerHandler.java | 27 ++++++++++++-------- .../OrchestratorServerThreadPoolExecutor.java | 21 ++++++++++++--- .../org/apache/airavata/server/ServerMain.java | 1 + 10 files changed, 96 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java index b9b6d03..fe076ee 100644 --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java +++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java @@ -22,7 +22,7 @@ package org.apache.airavata.api.server.handler; import kamon.Kamon; -import kamon.metric.instrument.Counter; +import kamon.metric.instrument.MinMaxCounter; import org.apache.airavata.api.Airavata; import org.apache.airavata.api.airavata_apiConstants; import org.apache.airavata.api.server.security.interceptor.SecurityCheck; @@ -93,9 +93,9 @@ public class AiravataServerHandler implements Airavata.Iface { private Publisher statusPublisher; private Publisher experimentPublisher; private CredentialStoreService.Client csClient; - private Counter experimentPublishCount = Kamon.metrics().counter(String.format("%s.experiment.publish-count", getClass().getCanonicalName())); - private Counter experimentLaunchPublishCount = Kamon.metrics().counter(String.format("%s.experiment_launch.publish-count", getClass().getCanonicalName())); - private Counter experimentCancelPublishCount = Kamon.metrics().counter(String.format("%s.experiment_cancel.publish-count", getClass().getCanonicalName())); + private MinMaxCounter experimentPublishCount = Kamon.metrics().minMaxCounter(String.format("%s.experiment.publish-count", getClass().getName())); + private MinMaxCounter experimentLaunchPublishCount = Kamon.metrics().minMaxCounter(String.format("%s.experiment_launch.publish-count", getClass().getName())); + private MinMaxCounter experimentCancelPublishCount = Kamon.metrics().minMaxCounter(String.format("%s.experiment_cancel.publish-count", getClass().getName())); public AiravataServerHandler() { try { http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java index 19073e6..bf6dca6 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java @@ -19,6 +19,8 @@ */ package org.apache.airavata.gfac.core; +import kamon.Kamon; +import kamon.metric.instrument.Histogram; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ServerSettings; import org.slf4j.Logger; @@ -26,12 +28,16 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; public class GFacThreadPoolExecutor { private final static Logger logger = LoggerFactory.getLogger(GFacThreadPoolExecutor.class); public static final String GFAC_THREAD_POOL_SIZE = "gfac.thread.pool.size"; private static ExecutorService threadPool; + private static Histogram threadPoolQueueSize = Kamon.metrics().histogram("GFacThreadPoolExecutor.queue-size"); + private static Histogram threadPoolActiveThreads = Kamon.metrics().histogram("GFacThreadPoolExecutor.active-threads"); + public static ExecutorService getCachedThreadPool() { if(threadPool ==null){ @@ -52,5 +58,8 @@ public class GFacThreadPoolExecutor { return threadPool; } - + public static void record() { + threadPoolQueueSize.record(((ThreadPoolExecutor)threadPool).getQueue().size()); + threadPoolActiveThreads.record(((ThreadPoolExecutor)threadPool).getActiveCount()); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java index 9c97d37..cc320a1 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java @@ -26,7 +26,7 @@ import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import com.jcraft.jsch.UserInfo; import kamon.Kamon; -import kamon.metric.instrument.Counter; +import kamon.metric.instrument.MinMaxCounter; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.gfac.core.JobManagerConfiguration; import org.apache.airavata.gfac.core.SSHApiException; @@ -53,13 +53,13 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ private final SSHKeyAuthentication authentication; private final JSch jSch; private Session session; - private Counter submittedJobCount = Kamon.metrics().counter(String.format("%s.submitted-jobs", getClass().getCanonicalName())); - private Counter nonZeroExitCodeJobCount = Kamon.metrics().counter(String.format("%s.nonzero-exit-jobs", getClass().getCanonicalName())); - private Counter emptyJobIdCount = Kamon.metrics().counter(String.format("%s.empty-jobid-jobs", getClass().getCanonicalName())); - private Counter copyToFailCount = Kamon.metrics().counter(String.format("%s.copyTo-fail", getClass().getCanonicalName())); - private Counter copyFromFailCount = Kamon.metrics().counter(String.format("%s.copyFrom-fail", getClass().getCanonicalName())); - private Counter mkDirFailCount = Kamon.metrics().counter(String.format("%s.mkDir-fail", getClass().getCanonicalName())); - private Counter listFailCount = Kamon.metrics().counter(String.format("%s.list-fail", getClass().getCanonicalName())); + private MinMaxCounter submittedJobCount = Kamon.metrics().minMaxCounter(String.format("%s.submitted-jobs", getClass().getName())); + private MinMaxCounter nonZeroExitCodeJobCount = Kamon.metrics().minMaxCounter(String.format("%s.nonzero-exit-jobs", getClass().getName())); + private MinMaxCounter emptyJobIdCount = Kamon.metrics().minMaxCounter(String.format("%s.empty-jobid-jobs", getClass().getName())); + private MinMaxCounter copyToFailCount = Kamon.metrics().minMaxCounter(String.format("%s.copyTo-fail", getClass().getName())); + private MinMaxCounter copyFromFailCount = Kamon.metrics().minMaxCounter(String.format("%s.copyFrom-fail", getClass().getName())); + private MinMaxCounter mkDirFailCount = Kamon.metrics().minMaxCounter(String.format("%s.mkDir-fail", getClass().getName())); + private MinMaxCounter listFailCount = Kamon.metrics().minMaxCounter(String.format("%s.list-fail", getClass().getName())); public HPCRemoteCluster(ServerInfo serverInfo, JobManagerConfiguration jobManagerConfiguration, AuthenticationInfo http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java index 2f59828..3f79358 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java @@ -25,7 +25,7 @@ import com.jcraft.jsch.ChannelExec; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import kamon.Kamon; -import kamon.metric.instrument.Counter; +import kamon.metric.instrument.MinMaxCounter; import kamon.metric.instrument.Histogram; import org.apache.airavata.gfac.core.SSHApiException; import org.slf4j.Logger; @@ -45,11 +45,11 @@ import java.util.List; */ public class SSHUtils { private static final Logger log = LoggerFactory.getLogger(SSHUtils.class); - private static Histogram scpToBytes = Kamon.metrics().histogram(String.format("%s.scpTo-bytes", SSHUtils.class.getCanonicalName())); - private static Counter scpToFailedCount = Kamon.metrics().counter(String.format("%s.scpTo-fail", SSHUtils.class.getCanonicalName())); + private static Histogram scpToBytes = Kamon.metrics().histogram(String.format("%s.scpTo-bytes", SSHUtils.class.getName())); + private static MinMaxCounter scpToFailedCount = Kamon.metrics().minMaxCounter(String.format("%s.scpTo-fail", SSHUtils.class.getName())); - private static Histogram scpFromBytes = Kamon.metrics().histogram(String.format("%s.scpFrom-bytes", SSHUtils.class.getCanonicalName())); - private static Counter scpFromFailedCount = Kamon.metrics().counter(String.format("%s.scpFrom-fail", SSHUtils.class.getCanonicalName())); + private static Histogram scpFromBytes = Kamon.metrics().histogram(String.format("%s.scpFrom-bytes", SSHUtils.class.getName())); + private static MinMaxCounter scpFromFailedCount = Kamon.metrics().minMaxCounter(String.format("%s.scpFrom-fail", SSHUtils.class.getName())); /** * This will copy a local file to a remote location http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java index d1afdd6..69de6b2 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java @@ -21,7 +21,7 @@ package org.apache.airavata.gfac.monitor.email; import kamon.Kamon; -import kamon.metric.instrument.Counter; +import kamon.metric.instrument.MinMaxCounter; import kamon.metric.instrument.Histogram; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.AiravataUtils; @@ -53,6 +53,7 @@ import javax.mail.search.FlagTerm; import javax.mail.search.SearchTerm; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; public class EmailBasedMonitor implements JobMonitor, Runnable{ private static final Logger log = LoggerFactory.getLogger(EmailBasedMonitor.class); @@ -74,10 +75,11 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ private Message[] flushUnseenMessages; private Map<String, Boolean> canceledJobs = new ConcurrentHashMap<>(); private Timer timer; - private Histogram monitorQueueSize = Kamon.metrics().histogram(String.format("%s.monitor-queue-size", getClass().getCanonicalName())); - private Histogram cancelledJobs = Kamon.metrics().histogram(String.format("%s.cancelled-jobs", getClass().getCanonicalName())); - private Counter completedJobCount = Kamon.metrics().counter(String.format("%s.completed-jobs", getClass().getCanonicalName())); - private Counter failedJobCount = Kamon.metrics().counter(String.format("%s.failed-jobs", getClass().getCanonicalName())); + private Histogram monitorQueueSize = Kamon.metrics().histogram(String.format("%s.monitor-queue-size", getClass().getName())); + private Histogram cancelledJobs = Kamon.metrics().histogram(String.format("%s.cancelled-jobs", getClass().getName())); + private MinMaxCounter completedJobCount = Kamon.metrics().minMaxCounter(String.format("%s.completed-jobs", getClass().getName())); + private MinMaxCounter failedJobCount = Kamon.metrics().minMaxCounter(String.format("%s.failed-jobs", getClass().getName())); + private ExecutorService cachedThreadPool; public EmailBasedMonitor(Map<ResourceJobManagerType, ResourceConfig> resourceConfigs) throws AiravataException { init(); @@ -99,6 +101,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ timer = new Timer("CancelJobHandler", true); long period = 1000 * 60 * 5; // five minute delay between successive task executions. timer.schedule(new CancelTimerTask(), 0 , period); + cachedThreadPool = GFacThreadPoolExecutor.getCachedThreadPool(); } private void populateAddressAndParserMap(Map<ResourceJobManagerType, ResourceConfig> resourceConfigs) throws AiravataException { @@ -133,6 +136,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ public void stopMonitor(String jobId, boolean runOutflow) { TaskContext taskContext = jobMonitorMap.remove(jobId); monitorQueueSize.record(jobMonitorMap.size()); + GFacThreadPoolExecutor.record(); if (taskContext != null && runOutflow) { try { ProcessContext pc = taskContext.getParentProcessContext(); @@ -150,12 +154,13 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ pStatus.setReason("Job cancelled"); pc.setProcessStatus(pStatus); GFacUtils.saveAndPublishProcessStatus(pc); - GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(pc)); + cachedThreadPool.execute(new GFacWorker(pc)); } catch (GFacException e) { log.info("[EJM]: Error while running output tasks", e); } } - } + GFacThreadPoolExecutor.record(); + } @Override public boolean isMonitoring(String jobId) { @@ -338,6 +343,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ } private void process(JobStatusResult jobStatusResult, TaskContext taskContext){ + GFacThreadPoolExecutor.record(); canceledJobs.remove(jobStatusResult.getJobId()); cancelledJobs.record(canceledJobs.size()); JobState resultState = jobStatusResult.getState(); @@ -413,11 +419,12 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ parentProcessContext.setProcessStatus(processStatus); GFacUtils.saveAndPublishProcessStatus(parentProcessContext); } - GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(parentProcessContext)); + cachedThreadPool.execute(new GFacWorker(parentProcessContext)); } catch (GFacException e) { log.info("[EJM]: Error while running output tasks", e); } } + GFacThreadPoolExecutor.record(); } private void writeEnvelopeOnError(Message m) throws MessagingException { http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 086093c..035543e 100644 --- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -21,7 +21,8 @@ package org.apache.airavata.gfac.server; import kamon.Kamon; -import kamon.metric.instrument.Counter; +import kamon.metric.instrument.Histogram; +import kamon.metric.instrument.MinMaxCounter; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.AiravataStartupException; import org.apache.airavata.common.exception.ApplicationSettingsException; @@ -62,6 +63,7 @@ import org.apache.thrift.TException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.Stat; +import org.eclipse.jetty.util.thread.ThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -73,11 +75,13 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + +import static sun.java2d.Disposer.getQueue; public class GfacServerHandler implements GfacService.Iface { private final static Logger log = LoggerFactory.getLogger(GfacServerHandler.class); private Subscriber processLaunchSubscriber; - private static int requestCount=0; private ExperimentCatalog experimentCatalog; private AppCatalog appCatalog; private String airavataUserName; @@ -87,7 +91,11 @@ public class GfacServerHandler implements GfacService.Iface { private BlockingQueue<TaskSubmitEvent> taskSubmitEvents; private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>(); private ExecutorService executorService; - private Counter consumedCount = Kamon.metrics().counter(String.format("%s.consumed-count", getClass().getCanonicalName())); + private MinMaxCounter consumedCount = Kamon.metrics().minMaxCounter(String.format("%s.consumed-count", getClass().getName())); + private Histogram threadPoolQueueSize = Kamon.metrics().histogram(String.format("%s.queue-size", getClass().getName())); + private Histogram threadPoolActiveThreads = Kamon.metrics().histogram(String.format("%s.active-threads", getClass().getName())); + private Histogram threadPoolTotalThreads = Kamon.metrics().histogram(String.format("%s.total-threads", getClass().getName())); + public GfacServerHandler() throws AiravataStartupException { try { @@ -163,7 +171,8 @@ public class GfacServerHandler implements GfacService.Iface { MDC.put(MDCConstants.GATEWAY_ID, gatewayId); MDC.put(MDCConstants.TOKEN_ID, tokenId); try { - executorService.execute(MDCUtil.wrapWithMDC(new GFacWorker(processId, gatewayId, tokenId))); + recordThreadPool(); + executorService.execute(MDCUtil.wrapWithMDC(new GFacWorker(processId, gatewayId, tokenId))); } catch (GFacException e) { log.error("Failed to submit process", e); @@ -174,6 +183,12 @@ public class GfacServerHandler implements GfacService.Iface { return true; } + private void recordThreadPool() { + threadPoolQueueSize.record(((ThreadPoolExecutor)executorService).getQueue().size()); + threadPoolActiveThreads.record(((ThreadPoolExecutor)executorService).getActiveCount()); + threadPoolTotalThreads.record(((ThreadPoolExecutor)executorService).getPoolSize()); + } + @Override public boolean cancelProcess(String processId, String gatewayId, String tokenId) throws TException { return false; http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java index 38f4e97..5d2a353 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java @@ -21,7 +21,7 @@ package org.apache.airavata.orchestrator.core.impl; import kamon.Kamon; -import kamon.metric.instrument.Counter; +import kamon.metric.instrument.MinMaxCounter; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ServerSettings; http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index b617016..bde61c9 100644 --- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -22,7 +22,7 @@ package org.apache.airavata.orchestrator.server; import kamon.Kamon; -import kamon.metric.instrument.Counter; +import kamon.metric.instrument.MinMaxCounter; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.logging.MDCConstants; @@ -74,6 +74,7 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; import java.util.*; +import java.util.concurrent.ExecutorService; public class OrchestratorServerHandler implements OrchestratorService.Iface { private static Logger log = LoggerFactory.getLogger(OrchestratorServerHandler.class); @@ -87,15 +88,16 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { private final Subscriber statusSubscribe; private final Subscriber experimentSubscriber; private CuratorFramework curatorClient; - private Counter publishCount = Kamon.metrics().counter(String.format("%s.publish-count", getClass().getCanonicalName())); - private Counter publishFail = Kamon.metrics().counter(String.format("%s.publish-fail-count", getClass().getCanonicalName())); - private Counter processConsumeCount = Kamon.metrics().counter(String.format("%s.process.consume-count", getClass().getCanonicalName())); - private Counter experimentConsumeCount = Kamon.metrics().counter(String.format("%s.experiment.consume-count", getClass().getCanonicalName())); - private Counter experimentLaunchConsumeCount = Kamon.metrics().counter(String.format("%s.experiment_launch.consume-count", getClass().getCanonicalName())); - private Counter experimentCancelConsumeCount = Kamon.metrics().counter(String.format("%s.experiment_cancel.consume-count", getClass().getCanonicalName())); - private Counter unsupportedMessageCount = Kamon.metrics().counter(String.format("%s.unsupported-count", getClass().getCanonicalName())); + private MinMaxCounter publishCount = Kamon.metrics().minMaxCounter(String.format("%s.publish-count", getClass().getName())); + private MinMaxCounter publishFail = Kamon.metrics().minMaxCounter(String.format("%s.publish-fail-count", getClass().getName())); + private MinMaxCounter processConsumeCount = Kamon.metrics().minMaxCounter(String.format("%s.process.consume-count", getClass().getName())); + private MinMaxCounter experimentConsumeCount = Kamon.metrics().minMaxCounter(String.format("%s.experiment.consume-count", getClass().getName())); + private MinMaxCounter experimentLaunchConsumeCount = Kamon.metrics().minMaxCounter(String.format("%s.experiment_launch.consume-count", getClass().getName())); + private MinMaxCounter experimentCancelConsumeCount = Kamon.metrics().minMaxCounter(String.format("%s.experiment_cancel.consume-count", getClass().getName())); + private MinMaxCounter unsupportedMessageCount = Kamon.metrics().minMaxCounter(String.format("%s.unsupported-count", getClass().getName())); + private ExecutorService cachedThreadPool; - /** + /** * Query orchestrator server to fetch the CPI version */ public String getOrchestratorCPIVersion() throws TException { @@ -109,6 +111,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { routingKeys.add(ServerSettings.getRabbitmqExperimentLaunchQueueName()); experimentSubscriber = MessagingFactory.getSubscriber(new ExperimentHandler(), routingKeys, Type.EXPERIMENT_LAUNCH); setAiravataUserName(ServerSettings.getDefaultUser()); + cachedThreadPool = OrchestratorServerThreadPoolExecutor.getCachedThreadPool(); } catch (AiravataException e) { log.error(e.getMessage(), e); throw new OrchestratorException("Error while initializing orchestrator service", e); @@ -144,7 +147,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { */ public boolean launchExperiment(String experimentId, String gatewayId) throws TException { ExperimentModel experiment = null; - try { + OrchestratorServerThreadPoolExecutor.record(); + try { String experimentNodePath = GFacUtils.getExperimentNodePath (experimentId); ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentNodePath); String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); @@ -237,7 +241,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); OrchestratorUtils.updageAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); log.info("expId: {}, Launched experiment ", experimentId); - OrchestratorServerThreadPoolExecutor.getCachedThreadPool().execute(MDCUtil.wrapWithMDC(new SingleAppExperimentRunner(experimentId, token, gatewayId))); + cachedThreadPool.execute(MDCUtil.wrapWithMDC(new SingleAppExperimentRunner(experimentId, token, gatewayId))); } else if (executionType == ExperimentType.WORKFLOW) { //its a workflow execution experiment log.debug(experimentId, "Launching workflow experiment {}.", experimentId); @@ -249,6 +253,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { } catch (Exception e) { throw new TException("Experiment '" + experimentId + "' launch failed. Unable to figureout execution type for application " + experiment.getExecutionId(), e); } + OrchestratorServerThreadPoolExecutor.record(); return true; } http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorServerThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorServerThreadPoolExecutor.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorServerThreadPoolExecutor.java index 3fdba74..a57419c 100644 --- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorServerThreadPoolExecutor.java +++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorServerThreadPoolExecutor.java @@ -23,20 +23,27 @@ package org.apache.airavata.orchestrator.util; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import kamon.Kamon; +import kamon.metric.instrument.Histogram; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ServerSettings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class OrchestratorServerThreadPoolExecutor { - private final static Logger logger = LoggerFactory.getLogger(OrchestratorServerThreadPoolExecutor.class); - public static final String AIRAVATA_SERVER_THREAD_POOL_SIZE = "airavata.server.thread.pool.size"; + private final static Logger logger = LoggerFactory.getLogger(OrchestratorServerThreadPoolExecutor.class); + public static final String AIRAVATA_SERVER_THREAD_POOL_SIZE = "airavata.server.thread.pool.size"; - private static ExecutorService threadPool; + private static ExecutorService threadPool; + private static Histogram threadPoolQueueSize = Kamon.metrics().histogram("OrchestratorServerThreadPoolExecutor.queue-size"); + private static Histogram threadPoolActiveThreads = Kamon.metrics().histogram("OrchestratorServerThreadPoolExecutor.active-threads"); + private static Histogram threadPoolTotalThreads = Kamon.metrics().histogram("OrchestratorServerThreadPoolExecutor.total-threads"); - public static ExecutorService getCachedThreadPool() { + + public static ExecutorService getCachedThreadPool() { if(threadPool ==null){ threadPool = Executors.newCachedThreadPool(); } @@ -53,4 +60,10 @@ public class OrchestratorServerThreadPoolExecutor { } return threadPool; } + + public static void record() { + threadPoolQueueSize.record(((ThreadPoolExecutor)threadPool).getQueue().size()); + threadPoolActiveThreads.record(((ThreadPoolExecutor)threadPool).getActiveCount()); + threadPoolTotalThreads.record(((ThreadPoolExecutor)threadPool).getPoolSize()); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java ---------------------------------------------------------------------- diff --git a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java index 8660974..7bce436 100644 --- a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java +++ b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java @@ -222,6 +222,7 @@ public class ServerMain { } } if (hasStopRequested()){ + Kamon.shutdown(); ServerSettings.setStopAllThreads(true); stopAllServers(); ShutdownStrategy shutdownStrategy;
