Repository: airavata Updated Branches: refs/heads/lahiru/AIRAVATA-2017 [created] 97247e39a
[AIRAVATA-2107] Introduce metrics to Airavata 1. Add kamon.io as a metrics library to capture metrices. 2. Add sample metrices to Airavata 3. Push calculated metrices to datadog as a stats viewer. Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/de75aa9b Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/de75aa9b Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/de75aa9b Branch: refs/heads/lahiru/AIRAVATA-2017 Commit: de75aa9be20348135a61371356d3f42d5b9a81f0 Parents: 8eea17f Author: Lahiru Ginnaliya Gamathige <[email protected]> Authored: Mon Sep 19 23:06:20 2016 -0700 Committer: Lahiru Ginnaliya Gamathige <[email protected]> Committed: Sat Oct 1 09:24:43 2016 -0700 ---------------------------------------------------------------------- modules/distribution/pom.xml | 10 ++++++++ modules/gfac/gfac-core/pom.xml | 4 +++ modules/gfac/gfac-impl/pom.xml | 4 +++ .../airavata/gfac/impl/HPCRemoteCluster.java | 21 +++++++++++++-- .../org/apache/airavata/gfac/impl/SSHUtils.java | 15 +++++++++++ .../gfac/monitor/email/EmailBasedMonitor.java | 21 ++++++++++++--- modules/orchestrator/orchestrator-core/pom.xml | 4 +++ modules/registry/registry-core/pom.xml | 27 ++++++++++++++++++++ modules/server/pom.xml | 4 +++ .../org/apache/airavata/server/ServerMain.java | 5 +++- pom.xml | 11 ++++++++ 11 files changed, 120 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/distribution/pom.xml ---------------------------------------------------------------------- diff --git a/modules/distribution/pom.xml b/modules/distribution/pom.xml index d096739..296c60c 100644 --- a/modules/distribution/pom.xml +++ b/modules/distribution/pom.xml @@ -567,6 +567,16 @@ <artifactId>airavata-client-samples</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>io.kamon</groupId> + <artifactId>kamon-core_2.11</artifactId> + <version>${kamon.version}</version> + </dependency> + <dependency> + <groupId>io.kamon</groupId> + <artifactId>kamon-datadog_2.11</artifactId> + <version>${kamon.version}</version> + </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/gfac/gfac-core/pom.xml ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml index 8d358ff..8e1329d 100644 --- a/modules/gfac/gfac-core/pom.xml +++ b/modules/gfac/gfac-core/pom.xml @@ -128,6 +128,10 @@ <artifactId>curator-framework</artifactId> <version>${curator.version}</version> </dependency> + <dependency> + <groupId>io.kamon</groupId> + <artifactId>kamon-core_2.11</artifactId> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/gfac/gfac-impl/pom.xml ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/pom.xml b/modules/gfac/gfac-impl/pom.xml index 2a0a949..ba512cd 100644 --- a/modules/gfac/gfac-impl/pom.xml +++ b/modules/gfac/gfac-impl/pom.xml @@ -122,5 +122,9 @@ <artifactId>commons-httpclient</artifactId> <version>3.1</version> </dependency> + <dependency> + <groupId>io.kamon</groupId> + <artifactId>kamon-core_2.11</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java index 725b6d0..9c97d37 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java @@ -25,6 +25,8 @@ import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import com.jcraft.jsch.UserInfo; +import kamon.Kamon; +import kamon.metric.instrument.Counter; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.gfac.core.JobManagerConfiguration; import org.apache.airavata.gfac.core.SSHApiException; @@ -51,6 +53,14 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ private final SSHKeyAuthentication authentication; private final JSch jSch; private Session session; + private Counter submittedJobCount = Kamon.metrics().counter(String.format("%s.submitted-jobs", getClass().getCanonicalName())); + private Counter nonZeroExitCodeJobCount = Kamon.metrics().counter(String.format("%s.nonzero-exit-jobs", getClass().getCanonicalName())); + private Counter emptyJobIdCount = Kamon.metrics().counter(String.format("%s.empty-jobid-jobs", getClass().getCanonicalName())); + private Counter copyToFailCount = Kamon.metrics().counter(String.format("%s.copyTo-fail", getClass().getCanonicalName())); + private Counter copyFromFailCount = Kamon.metrics().counter(String.format("%s.copyFrom-fail", getClass().getCanonicalName())); + private Counter mkDirFailCount = Kamon.metrics().counter(String.format("%s.mkDir-fail", getClass().getCanonicalName())); + private Counter listFailCount = Kamon.metrics().counter(String.format("%s.list-fail", getClass().getCanonicalName())); + public HPCRemoteCluster(ServerInfo serverInfo, JobManagerConfiguration jobManagerConfiguration, AuthenticationInfo authenticationInfo) throws AiravataException { @@ -90,6 +100,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ submitCommand.setRawCommand("cd " + workingDirectory + "; " + submitCommand.getRawCommand()); StandardOutReader reader = new StandardOutReader(); executeCommand(submitCommand, reader); + submittedJobCount.increment(); // throwExceptionOnError(reader, submitCommand); jsoutput.setJobId(outputParser.parseJobSubmission(reader.getStdOutputString())); if (jsoutput.getJobId() == null) { @@ -97,6 +108,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ jsoutput.setJobSubmissionFailed(true); jsoutput.setFailureReason("stdout : " + reader.getStdOutputString() + "\n stderr : " + reader.getStdErrorString()); + emptyJobIdCount.increment(); } } jsoutput.setExitCode(reader.getExitCode()); @@ -104,6 +116,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ jsoutput.setJobSubmissionFailed(true); jsoutput.setFailureReason("stdout : " + reader.getStdOutputString() + "\n stderr : " + reader.getStdErrorString()); + nonZeroExitCodeJobCount.increment(); } jsoutput.setStdOut(reader.getStdOutputString()); jsoutput.setStdErr(reader.getStdErrorString()); @@ -120,6 +133,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ SSHUtils.scpTo(localFile, remoteFile, session); retry = 0; } catch (Exception e) { + copyToFailCount.increment(); retry--; try { session = Factory.getSSHSession(authenticationInfo, serverInfo); @@ -147,6 +161,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ SSHUtils.scpFrom(remoteFile, localFile, session); retry=0; } catch (Exception e) { + copyFromFailCount.increment(); retry--; try { session = Factory.getSSHSession(authenticationInfo, serverInfo); @@ -205,6 +220,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ SSHUtils.makeDirectory(directoryPath, session); break; // Exit while loop } catch (JSchException e) { + mkDirFailCount.increment(); if (retryCount == MAX_RETRY_COUNT) { log.error("Retry count " + MAX_RETRY_COUNT + " exceeded for creating directory: " + serverInfo.getHost() + ":" + directoryPath, e); @@ -263,6 +279,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ log.info("Creating directory: " + serverInfo.getHost() + ":" + directoryPath); return SSHUtils.listDirectory(directoryPath, session); } catch (JSchException | AiravataException | IOException e) { + listFailCount.increment(); throw new SSHApiException("Failed to list directory " + serverInfo.getHost() + ":" + directoryPath, e); } } @@ -302,7 +319,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ // noting to do }else if ((stdErrorString.contains(command.trim()) && !stdErrorString.contains("Warning")) || stdErrorString .contains("error")) { - log.error("Command {} , Standard Error output {}", command, stdErrorString); + log.error(String.format("Command %s , Standard Error output %s", command, stdErrorString)); throw new SSHApiException("Error running command " + command + " on remote cluster. StandardError: " + stdErrorString); } @@ -322,7 +339,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ channelExec.setInputStream(null); channelExec.setErrStream(commandOutput.getStandardError()); channelExec.connect(); - log.info("Executing command {}", commandInfo.getCommand()); + log.info(String.format("Executing command %s", commandInfo.getCommand())); commandOutput.onOutput(channelExec); break; // exit from while loop } catch (JSchException e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java index cd5651e..2f59828 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java @@ -24,6 +24,9 @@ import com.jcraft.jsch.Channel; import com.jcraft.jsch.ChannelExec; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; +import kamon.Kamon; +import kamon.metric.instrument.Counter; +import kamon.metric.instrument.Histogram; import org.apache.airavata.gfac.core.SSHApiException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +45,11 @@ import java.util.List; */ public class SSHUtils { private static final Logger log = LoggerFactory.getLogger(SSHUtils.class); + private static Histogram scpToBytes = Kamon.metrics().histogram(String.format("%s.scpTo-bytes", SSHUtils.class.getCanonicalName())); + private static Counter scpToFailedCount = Kamon.metrics().counter(String.format("%s.scpTo-fail", SSHUtils.class.getCanonicalName())); + private static Histogram scpFromBytes = Kamon.metrics().histogram(String.format("%s.scpFrom-bytes", SSHUtils.class.getCanonicalName())); + private static Counter scpFromFailedCount = Kamon.metrics().counter(String.format("%s.scpFrom-fail", SSHUtils.class.getCanonicalName())); /** * This will copy a local file to a remote location @@ -79,6 +86,7 @@ public class SSHUtils { if (checkAck(in) != 0) { String error = "Error Reading input Stream"; log.error(error); + scpToFailedCount.increment(); throw new SSHApiException(error); } @@ -94,6 +102,7 @@ public class SSHUtils { if (checkAck(in) != 0) { String error = "Error Reading input Stream"; log.error(error); + scpToFailedCount.increment(); throw new SSHApiException(error); } } @@ -112,6 +121,7 @@ public class SSHUtils { if (checkAck(in) != 0) { String error = "Error Reading input Stream"; log.error(error); + scpToFailedCount.increment(); throw new SSHApiException(error); } @@ -122,6 +132,7 @@ public class SSHUtils { int len = fis.read(buf, 0, buf.length); if (len <= 0) break; out.write(buf, 0, len); //out.flush(); + scpToBytes.record(command.getBytes().length); } fis.close(); fis = null; @@ -131,6 +142,7 @@ public class SSHUtils { out.flush(); if (checkAck(in) != 0) { String error = "Error Reading input Stream"; + scpToFailedCount.increment(); log.error(error); throw new SSHApiException(error); } @@ -140,6 +152,7 @@ public class SSHUtils { channel.disconnect(); if (stdOutReader.getStdErrorString().contains("scp:")) { + scpToFailedCount.increment(); throw new SSHApiException(stdOutReader.getStdErrorString()); } //since remote file is always a file we just return the file @@ -232,6 +245,7 @@ public class SSHUtils { } fos.write(buf, 0, foo); filesize -= foo; + scpFromBytes.record(foo); if (filesize == 0L) break; } fos.close(); @@ -254,6 +268,7 @@ public class SSHUtils { } } catch (Exception e) { + scpFromFailedCount.increment(); log.error(e.getMessage(), e); } finally { try { http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java index bbcd635..d1afdd6 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java @@ -20,6 +20,9 @@ */ package org.apache.airavata.gfac.monitor.email; +import kamon.Kamon; +import kamon.metric.instrument.Counter; +import kamon.metric.instrument.Histogram; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ServerSettings; @@ -71,12 +74,15 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ private Message[] flushUnseenMessages; private Map<String, Boolean> canceledJobs = new ConcurrentHashMap<>(); private Timer timer; - + private Histogram monitorQueueSize = Kamon.metrics().histogram(String.format("%s.monitor-queue-size", getClass().getCanonicalName())); + private Histogram cancelledJobs = Kamon.metrics().histogram(String.format("%s.cancelled-jobs", getClass().getCanonicalName())); + private Counter completedJobCount = Kamon.metrics().counter(String.format("%s.completed-jobs", getClass().getCanonicalName())); + private Counter failedJobCount = Kamon.metrics().counter(String.format("%s.failed-jobs", getClass().getCanonicalName())); public EmailBasedMonitor(Map<ResourceJobManagerType, ResourceConfig> resourceConfigs) throws AiravataException { init(); populateAddressAndParserMap(resourceConfigs); - } + } private void init() throws AiravataException { host = ServerSettings.getEmailBasedMonitorHost(); @@ -119,13 +125,15 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ public void monitor(String jobId, TaskContext taskContext) { log.info("[EJM]: Added monitor Id : {} to email based monitor map", jobId); jobMonitorMap.put(jobId, taskContext); + monitorQueueSize.record(jobMonitorMap.size()); taskContext.getParentProcessContext().setPauseTaskExecution(true); } @Override public void stopMonitor(String jobId, boolean runOutflow) { TaskContext taskContext = jobMonitorMap.remove(jobId); - if (taskContext != null && runOutflow) { + monitorQueueSize.record(jobMonitorMap.size()); + if (taskContext != null && runOutflow) { try { ProcessContext pc = taskContext.getParentProcessContext(); if (taskContext.isCancel()) { @@ -157,6 +165,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ @Override public void canceledJob(String jobId) { canceledJobs.put(jobId, Boolean.FALSE); + cancelledJobs.record(canceledJobs.size()); } private JobStatusResult parse(Message message) throws MessagingException, AiravataException { @@ -330,6 +339,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ private void process(JobStatusResult jobStatusResult, TaskContext taskContext){ canceledJobs.remove(jobStatusResult.getJobId()); + cancelledJobs.record(canceledJobs.size()); JobState resultState = jobStatusResult.getState(); // TODO : update job state on process context boolean runOutflowTasks = false; @@ -340,6 +350,8 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ // TODO - Handle all other valid JobStates if (resultState == JobState.COMPLETE) { jobMonitorMap.remove(jobStatusResult.getJobId()); + monitorQueueSize.record(jobMonitorMap.size()); + completedJobCount.increment(); jobStatus.setJobState(JobState.COMPLETE); jobStatus.setReason("Complete email received"); jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); @@ -359,6 +371,8 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ log.info("[EJM]: Job Active email received, " + jobDetails); }else if (resultState == JobState.FAILED) { jobMonitorMap.remove(jobStatusResult.getJobId()); + monitorQueueSize.record(jobMonitorMap.size()); + failedJobCount.increment(); runOutflowTasks = true; jobStatus.setJobState(JobState.FAILED); jobStatus.setReason("Failed email received"); @@ -366,6 +380,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ log.info("[EJM]: Job failed email received , removed job from job monitoring. " + jobDetails); }else if (resultState == JobState.CANCELED) { jobMonitorMap.remove(jobStatusResult.getJobId()); + monitorQueueSize.record(jobMonitorMap.size()); jobStatus.setJobState(JobState.CANCELED); jobStatus.setReason("Canceled email received"); jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/orchestrator/orchestrator-core/pom.xml ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/pom.xml b/modules/orchestrator/orchestrator-core/pom.xml index ee0d23a..8e21b6d 100644 --- a/modules/orchestrator/orchestrator-core/pom.xml +++ b/modules/orchestrator/orchestrator-core/pom.xml @@ -98,6 +98,10 @@ the License. --> <artifactId>airavata-server-configuration</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>io.kamon</groupId> + <artifactId>kamon-core_2.11</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/registry/registry-core/pom.xml ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/pom.xml b/modules/registry/registry-core/pom.xml index c2b26b2..8aa80c1 100644 --- a/modules/registry/registry-core/pom.xml +++ b/modules/registry/registry-core/pom.xml @@ -102,6 +102,33 @@ <build> <plugins> + <!--plugin> + <groupId>org.apache.openjpa</groupId> + <artifactId>openjpa-maven-plugin</artifactId> + <version>2.2.0</version> + <configuration> + <includes>**/entities/*.class</includes> + <excludes>**/entities/XML*.class</excludes> + <addDefaultConstructor>true</addDefaultConstructor> + <enforcePropertyRestrictions>true</enforcePropertyRestrictions> + </configuration> + <executions> + <execution> + <id>enhancer</id> + <phase>process-classes</phase> + <goals> + <goal>enhance</goal> + </goals> + </execution> + </executions> + <dependencies> + <dependency> + <groupId>org.apache.openjpa</groupId> + <artifactId>openjpa</artifactId> + <version>2.2.0</version> + </dependency> + </dependencies> + </plugin--> <!--<plugin>--> <!--<groupId>org.apache.maven.plugins</groupId>--> <!--<artifactId>maven-antrun-plugin</artifactId>--> http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/server/pom.xml ---------------------------------------------------------------------- diff --git a/modules/server/pom.xml b/modules/server/pom.xml index d306c83..8c4b060 100644 --- a/modules/server/pom.xml +++ b/modules/server/pom.xml @@ -60,5 +60,9 @@ <artifactId>zookeeper</artifactId> <version>3.4.0</version> </dependency> + <dependency> + <groupId>io.kamon</groupId> + <artifactId>kamon-core_2.11</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java ---------------------------------------------------------------------- diff --git a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java index 1c0483d..8660974 100644 --- a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java +++ b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java @@ -23,6 +23,7 @@ package org.apache.airavata.server; import ch.qos.logback.classic.LoggerContext; import org.apache.airavata.api.Airavata; import org.apache.airavata.common.exception.AiravataException; +import kamon.Kamon; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.logging.kafka.KafkaAppender; import org.apache.airavata.common.utils.*; @@ -44,6 +45,7 @@ import java.util.Arrays; import java.util.List; public class ServerMain { + private static List<IServer> servers; private static final String SERVERS_KEY="servers"; private final static Logger logger = LoggerFactory.getLogger(ServerMain.class); @@ -160,6 +162,7 @@ public class ServerMain { // } public static void main(String args[]) throws ParseException, IOException, AiravataException { + Kamon.start(); ServerSettings.mergeSettingsCommandLineArgs(args); ServerSettings.setServerRoles(ApplicationSettings.getSetting(SERVERS_KEY, "all").split(",")); @@ -423,4 +426,4 @@ public class ServerMain { return -1; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a65df09..bf5f592 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,7 @@ <maven.replacer.plugin.version>1.5.3</maven.replacer.plugin.version> <kafka-clients.version>0.8.2.2</kafka-clients.version> <logback.version>1.1.6</logback.version> + <kamon.version>0.6.0</kamon.version> </properties> <developers> @@ -446,6 +447,16 @@ <artifactId>logback-classic</artifactId> <version>${logback.version}</version> </dependency> + <dependency> + <groupId>io.kamon</groupId> + <artifactId>kamon-core_2.11</artifactId> + <version>${kamon.version}</version> + </dependency> + <dependency> + <groupId>io.kamon</groupId> + <artifactId>kamon-datadog_2.11</artifactId> + <version>${kamon.version}</version> + </dependency> </dependencies> </dependencyManagement>
