More metrices
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/97247e39 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/97247e39 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/97247e39 Branch: refs/heads/lahiru/AIRAVATA-2017 Commit: 97247e39ac15357ba7038e3ed484946a238a8c63 Parents: de75aa9 Author: Lahiru Ginnaliya Gamathige <[email protected]> Authored: Sat Oct 1 13:07:52 2016 -0700 Committer: Lahiru Ginnaliya Gamathige <[email protected]> Committed: Sat Oct 1 13:07:52 2016 -0700 ---------------------------------------------------------------------- airavata-api/airavata-api-server/pom.xml | 4 ++++ .../server/handler/AiravataServerHandler.java | 9 ++++++++ .../airavata/gfac/server/GfacServerHandler.java | 4 ++++ .../core/impl/GFACPassiveJobSubmitter.java | 2 ++ .../server/OrchestratorServerHandler.java | 22 ++++++++++++++++++-- 5 files changed, 39 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/97247e39/airavata-api/airavata-api-server/pom.xml ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-api-server/pom.xml b/airavata-api/airavata-api-server/pom.xml index e5b549c..cef9910 100644 --- a/airavata-api/airavata-api-server/pom.xml +++ b/airavata-api/airavata-api-server/pom.xml @@ -141,6 +141,10 @@ <artifactId>jackson-mapper-asl</artifactId> <version>1.9.13</version> </dependency> + <dependency> + <groupId>io.kamon</groupId> + <artifactId>kamon-core_2.11</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/airavata/blob/97247e39/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java index 5ccf874..b9b6d03 100644 --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java +++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java @@ -21,6 +21,8 @@ package org.apache.airavata.api.server.handler; +import kamon.Kamon; +import kamon.metric.instrument.Counter; import org.apache.airavata.api.Airavata; import org.apache.airavata.api.airavata_apiConstants; import org.apache.airavata.api.server.security.interceptor.SecurityCheck; @@ -91,6 +93,9 @@ public class AiravataServerHandler implements Airavata.Iface { private Publisher statusPublisher; private Publisher experimentPublisher; private CredentialStoreService.Client csClient; + private Counter experimentPublishCount = Kamon.metrics().counter(String.format("%s.experiment.publish-count", getClass().getCanonicalName())); + private Counter experimentLaunchPublishCount = Kamon.metrics().counter(String.format("%s.experiment_launch.publish-count", getClass().getCanonicalName())); + private Counter experimentCancelPublishCount = Kamon.metrics().counter(String.format("%s.experiment_cancel.publish-count", getClass().getCanonicalName())); public AiravataServerHandler() { try { @@ -3645,6 +3650,8 @@ public class AiravataServerHandler implements Airavata.Iface { MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT, "LAUNCH.EXP-" + UUID.randomUUID().toString(), gatewayId); messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); experimentPublisher.publish(messageContext); + experimentPublishCount.increment(); + experimentLaunchPublishCount.increment(); } private void submitCancelExperiment(String gatewayId, String experimentId) throws AiravataException { @@ -3652,6 +3659,8 @@ public class AiravataServerHandler implements Airavata.Iface { MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT_CANCEL, "CANCEL.EXP-" + UUID.randomUUID().toString(), gatewayId); messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); experimentPublisher.publish(messageContext); + experimentPublishCount.increment(); + experimentCancelPublishCount.increment(); } private CredentialStoreService.Client getCredentialStoreServiceClient() throws TException, ApplicationSettingsException { http://git-wip-us.apache.org/repos/asf/airavata/blob/97247e39/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 8e285e5..086093c 100644 --- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -20,6 +20,8 @@ */ package org.apache.airavata.gfac.server; +import kamon.Kamon; +import kamon.metric.instrument.Counter; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.AiravataStartupException; import org.apache.airavata.common.exception.ApplicationSettingsException; @@ -85,6 +87,7 @@ public class GfacServerHandler implements GfacService.Iface { private BlockingQueue<TaskSubmitEvent> taskSubmitEvents; private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>(); private ExecutorService executorService; + private Counter consumedCount = Kamon.metrics().counter(String.format("%s.consumed-count", getClass().getCanonicalName())); public GfacServerHandler() throws AiravataStartupException { try { @@ -188,6 +191,7 @@ public class GfacServerHandler implements GfacService.Iface { public void onMessage(MessageContext messageContext) { MDC.put(MDCConstants.GATEWAY_ID, messageContext.getGatewayId()); + consumedCount.increment(); log.info(" Message Received with message id {} and with message type: {}" + messageContext.getMessageId(), messageContext.getType()); if (messageContext.getType().equals(MessageType.LAUNCHPROCESS)) { ProcessStatus status = new ProcessStatus(); http://git-wip-us.apache.org/repos/asf/airavata/blob/97247e39/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java index 3438475..38f4e97 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java @@ -20,6 +20,8 @@ */ package org.apache.airavata.orchestrator.core.impl; +import kamon.Kamon; +import kamon.metric.instrument.Counter; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ServerSettings; http://git-wip-us.apache.org/repos/asf/airavata/blob/97247e39/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index df5865e..b617016 100644 --- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -21,6 +21,8 @@ package org.apache.airavata.orchestrator.server; +import kamon.Kamon; +import kamon.metric.instrument.Counter; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.logging.MDCConstants; @@ -85,6 +87,13 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { private final Subscriber statusSubscribe; private final Subscriber experimentSubscriber; private CuratorFramework curatorClient; + private Counter publishCount = Kamon.metrics().counter(String.format("%s.publish-count", getClass().getCanonicalName())); + private Counter publishFail = Kamon.metrics().counter(String.format("%s.publish-fail-count", getClass().getCanonicalName())); + private Counter processConsumeCount = Kamon.metrics().counter(String.format("%s.process.consume-count", getClass().getCanonicalName())); + private Counter experimentConsumeCount = Kamon.metrics().counter(String.format("%s.experiment.consume-count", getClass().getCanonicalName())); + private Counter experimentLaunchConsumeCount = Kamon.metrics().counter(String.format("%s.experiment_launch.consume-count", getClass().getCanonicalName())); + private Counter experimentCancelConsumeCount = Kamon.metrics().counter(String.format("%s.experiment_cancel.consume-count", getClass().getCanonicalName())); + private Counter unsupportedMessageCount = Kamon.metrics().counter(String.format("%s.unsupported-count", getClass().getCanonicalName())); /** * Query orchestrator server to fetch the CPI version @@ -457,7 +466,11 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { List<String> processIds = experimentCatalog.getIds(ExperimentCatalogModelType.PROCESS, AbstractExpCatResource.ProcessConstants.EXPERIMENT_ID, experimentId); for (String processId : processIds) { - launchProcess(processId, airavataCredStoreToken, gatewayId); + if (launchProcess(processId, airavataCredStoreToken, gatewayId)) { + publishCount.increment(); + } else { + publishFail.increment(); + } } // ExperimentStatus status = new ExperimentStatus(ExperimentState.LAUNCHED); // status.setReason("submitted all processes"); @@ -492,6 +505,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { @Override public void onMessage(MessageContext message) { if (message.getType().equals(MessageType.PROCESS)) { + processConsumeCount.increment(); try { ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(); TBase event = message.getEvent(); @@ -583,7 +597,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { "Error" + " while prcessing process status change event"); } } else { - System.out.println("Message Recieved with message id " + message.getMessageId() + " and with message " + + log.info("Message Recieved with message id " + message.getMessageId() + " and with message " + "type " + message.getType().name()); } } @@ -595,14 +609,18 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { @Override public void onMessage(MessageContext messageContext) { MDC.put(MDCConstants.GATEWAY_ID, messageContext.getGatewayId()); + experimentConsumeCount.increment(); switch (messageContext.getType()) { case EXPERIMENT: + experimentLaunchConsumeCount.increment(); launchExperiment(messageContext); break; case EXPERIMENT_CANCEL: + experimentCancelConsumeCount.increment(); cancelExperiment(messageContext); break; default: + unsupportedMessageCount.increment(); experimentSubscriber.sendAck(messageContext.getDeliveryTag()); log.error("Orchestrator got un-support message type : " + messageContext.getType()); break;
