http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java new file mode 100644 index 0000000..884ccd5 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.local.handler; + +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.handler.AbstractHandler; +import org.apache.airavata.gfac.core.handler.GFacHandlerException; +import org.apache.airavata.model.appcatalog.appinterface.DataType; +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.Properties; + + +public class LocalInputHandler extends AbstractHandler { + private static final Logger logger = LoggerFactory.getLogger(LocalInputHandler.class); + @Override + public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + super.invoke(jobExecutionContext); + Map<String, Object> inputParameters = jobExecutionContext.getInMessageContext().getParameters(); + for (Map.Entry<String, Object> inputParamEntry : inputParameters.entrySet()) { + if (inputParamEntry.getValue() instanceof InputDataObjectType) { + InputDataObjectType inputDataObject = (InputDataObjectType) inputParamEntry.getValue(); + if (inputDataObject.getType() == DataType.URI + && inputDataObject != null + && !inputDataObject.getValue().equals("")) { + try { + inputDataObject.setValue(stageFile(jobExecutionContext.getInputDir(), inputDataObject.getValue())); + } catch (IOException e) { + throw new GFacHandlerException("Error while data staging sourceFile= " + inputDataObject.getValue()); + } + } + } + } + } + + private String stageFile(String inputDir, String sourceFilePath) throws IOException { + int i = sourceFilePath.lastIndexOf(File.separator); + String substring = sourceFilePath.substring(i + 1); + if (inputDir.endsWith("/")) { + inputDir = inputDir.substring(0, inputDir.length() - 1); + } + String targetFilePath = inputDir + File.separator + substring; + + if (sourceFilePath.startsWith("file")) { + sourceFilePath = sourceFilePath.substring(sourceFilePath.indexOf(":") + 1, sourceFilePath.length()); + } + + File sourceFile = new File(sourceFilePath); + File targetFile = new File(targetFilePath); + if (targetFile.exists()) { + targetFile.delete(); + } + logger.info("staging source file : " + sourceFilePath + " to target file : " + targetFilePath); + FileUtils.copyFile(sourceFile, targetFile); + + return targetFilePath; + } + + @Override + public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + + } + + @Override + public void initProperties(Properties properties) throws GFacHandlerException { + + } +}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java new file mode 100644 index 0000000..5519ee0 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java @@ -0,0 +1,311 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.local.provider.impl; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.airavata.gfac.Constants; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent; +import org.apache.airavata.gfac.core.provider.AbstractProvider; +import org.apache.airavata.gfac.core.provider.GFacProviderException; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.impl.OutputUtils; +import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter; +import org.apache.airavata.gfac.local.utils.InputUtils; +import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; +import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths; +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.model.messaging.event.JobIdentifier; +import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; +import org.apache.airavata.model.messaging.event.TaskIdentifier; +import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent; +import org.apache.airavata.model.workspace.experiment.JobDetails; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.apache.airavata.model.workspace.experiment.TaskDetails; +import org.apache.airavata.registry.cpi.ChildDataType; +import org.apache.airavata.registry.cpi.RegistryModelType; +import org.apache.xmlbeans.XmlException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +public class LocalProvider extends AbstractProvider { + private static final Logger log = LoggerFactory.getLogger(LocalProvider.class); + private ProcessBuilder builder; + private List<String> cmdList; + private String jobId; + + public static class LocalProviderJobData{ + private String applicationName; + private List<String> inputParameters; + private String workingDir; + private String inputDir; + private String outputDir; + public String getApplicationName() { + return applicationName; + } + public void setApplicationName(String applicationName) { + this.applicationName = applicationName; + } + public List<String> getInputParameters() { + return inputParameters; + } + public void setInputParameters(List<String> inputParameters) { + this.inputParameters = inputParameters; + } + public String getWorkingDir() { + return workingDir; + } + public void setWorkingDir(String workingDir) { + this.workingDir = workingDir; + } + public String getInputDir() { + return inputDir; + } + public void setInputDir(String inputDir) { + this.inputDir = inputDir; + } + public String getOutputDir() { + return outputDir; + } + public void setOutputDir(String outputDir) { + this.outputDir = outputDir; + } + } + public LocalProvider(){ + cmdList = new ArrayList<String>(); + } + + public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException { + super.initialize(jobExecutionContext); + + // build command with all inputs + buildCommand(); + initProcessBuilder(jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription()); + + // extra environment variables + builder.environment().put(Constants.INPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getInputDir()); + builder.environment().put(Constants.OUTPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getOutputDir()); + + // set working directory + builder.directory(new File(jobExecutionContext.getWorkingDir())); + + // log info + log.info("Command = " + InputUtils.buildCommand(cmdList)); + log.info("Working dir = " + builder.directory()); + /*for (String key : builder.environment().keySet()) { + log.info("Env[" + key + "] = " + builder.environment().get(key)); + }*/ + } + + public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException { + jobExecutionContext.getNotifier().publish(new StartExecutionEvent()); + JobDetails jobDetails = new JobDetails(); + try { + jobId = jobExecutionContext.getTaskData().getTaskID(); + jobDetails.setJobID(jobId); + jobDetails.setJobDescription(jobExecutionContext.getApplicationContext() + .getApplicationDeploymentDescription().getAppDeploymentDescription()); + jobExecutionContext.setJobDetails(jobDetails); + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SETUP); + // running cmd + Process process = builder.start(); + + Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), jobExecutionContext.getStandardOutput()); + Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), jobExecutionContext.getStandardError()); + + // start output threads + standardOutWriter.setDaemon(true); + standardErrorWriter.setDaemon(true); + standardOutWriter.start(); + standardErrorWriter.start(); + + int returnValue = process.waitFor(); + + // make sure other two threads are done + standardOutWriter.join(); + standardErrorWriter.join(); + + /* + * check return value. usually not very helpful to draw conclusions based on return values so don't bother. + * just provide warning in the log messages + */ + if (returnValue != 0) { + log.error("Process finished with non zero return value. Process may have failed"); + } else { + log.info("Process finished with return value of zero."); + } + + StringBuffer buf = new StringBuffer(); + buf.append("Executed ").append(InputUtils.buildCommand(cmdList)) + .append(" on the localHost, working directory = ").append(jobExecutionContext.getWorkingDir()) + .append(" tempDirectory = ").append(jobExecutionContext.getScratchLocation()).append(" With the status ") + .append(String.valueOf(returnValue)); + + log.info(buf.toString()); + + // updating the job status to complete because there's nothing to monitor in local jobs +// MonitorID monitorID = createMonitorID(jobExecutionContext); + JobIdentifier jobIdentity = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(), + jobExecutionContext.getTaskData().getTaskID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + jobExecutionContext.getExperimentID(), + jobExecutionContext.getGatewayID()); + jobExecutionContext.getMonitorPublisher().publish(new JobStatusChangeEvent(JobState.COMPLETE, jobIdentity)); + } catch (IOException io) { + throw new GFacProviderException(io.getMessage(), io); + } catch (InterruptedException e) { + throw new GFacProviderException(e.getMessage(), e); + }catch (GFacException e) { + throw new GFacProviderException(e.getMessage(), e); + } + } + +// private MonitorID createMonitorID(JobExecutionContext jobExecutionContext) { +// MonitorID monitorID = new MonitorID(jobExecutionContext.getApplicationContext().getHostDescription(), jobId, +// jobExecutionContext.getTaskData().getTaskID(), +// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(), +// jobExecutionContext.getExperiment().getUserName(),jobId); +// return monitorID; +// } + +// private void saveApplicationJob(JobExecutionContext jobExecutionContext) +// throws GFacProviderException { +// ApplicationDeploymentDescriptionType app = jobExecutionContext. +// getApplicationContext().getApplicationDeploymentDescription().getType(); +// ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext); +// appJob.setJobId(jobId); +// LocalProviderJobData data = new LocalProviderJobData(); +// data.setApplicationName(app.getExecutableLocation()); +// data.setInputDir(app.getInputDataDirectory()); +// data.setOutputDir(app.getOutputDataDirectory()); +// data.setWorkingDir(builder.directory().toString()); +// data.setInputParameters(ProviderUtils.getInputParameters(jobExecutionContext)); +// ByteArrayOutputStream stream = new ByteArrayOutputStream(); +// JAXB.marshal(data, stream); +// appJob.setJobData(stream.toString()); +// appJob.setSubmittedTime(Calendar.getInstance().getTime()); +// appJob.setStatus(ApplicationJobStatus.SUBMITTED); +// appJob.setStatusUpdateTime(appJob.getSubmittedTime()); +// GFacUtils.recordApplicationJob(jobExecutionContext, appJob); +// } + + public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException { + try { + List<OutputDataObjectType> outputArray = new ArrayList<OutputDataObjectType>(); + String stdOutStr = GFacUtils.readFileToString(jobExecutionContext.getStandardOutput()); + String stdErrStr = GFacUtils.readFileToString(jobExecutionContext.getStandardError()); + Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters(); + OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray); + TaskDetails taskDetails = (TaskDetails)registry.get(RegistryModelType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID()); + if (taskDetails != null){ + taskDetails.setApplicationOutputs(outputArray); + registry.update(RegistryModelType.TASK_DETAIL, taskDetails, taskDetails.getTaskID()); + } + registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID()); + TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + jobExecutionContext.getExperimentID(), + jobExecutionContext.getGatewayID()); + jobExecutionContext.getMonitorPublisher().publish(new TaskOutputChangeEvent(outputArray, taskIdentity)); + } catch (XmlException e) { + throw new GFacProviderException("Cannot read output:" + e.getMessage(), e); + } catch (IOException io) { + throw new GFacProviderException(io.getMessage(), io); + } catch (Exception e){ + throw new GFacProviderException("Error in retrieving results",e); + } + } + + public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacException { + throw new NotImplementedException(); + } + + @Override + public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { + // TODO: Auto generated method body. + } + + @Override + public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { + // TODO: Auto generated method body. + } + + + private void buildCommand() { + cmdList.add(jobExecutionContext.getExecutablePath()); + Map<String, Object> inputParameters = jobExecutionContext.getInMessageContext().getParameters(); + + // sort the inputs first and then build the command List + Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() { + @Override + public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) { + return inputDataObjectType.getInputOrder() - t1.getInputOrder(); + } + }; + Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator); + for (Object object : inputParameters.values()) { + if (object instanceof InputDataObjectType) { + InputDataObjectType inputDOT = (InputDataObjectType) object; + sortedInputSet.add(inputDOT); + } + } + for (InputDataObjectType inputDataObjectType : sortedInputSet) { + if (inputDataObjectType.getApplicationArgument() != null + && !inputDataObjectType.getApplicationArgument().equals("")) { + cmdList.add(inputDataObjectType.getApplicationArgument()); + } + + if (inputDataObjectType.getValue() != null + && !inputDataObjectType.getValue().equals("")) { + cmdList.add(inputDataObjectType.getValue()); + } + } + + } + + private void initProcessBuilder(ApplicationDeploymentDescription app){ + builder = new ProcessBuilder(cmdList); + + List<SetEnvPaths> setEnvironment = app.getSetEnvironment(); + if (setEnvironment != null) { + for (SetEnvPaths envPath : setEnvironment) { + Map<String,String> builderEnv = builder.environment(); + builderEnv.put(envPath.getName(), envPath.getValue()); + } + } + } + + public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException { + + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputStreamToFileWriter.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputStreamToFileWriter.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputStreamToFileWriter.java new file mode 100644 index 0000000..2467ce8 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputStreamToFileWriter.java @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.local.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; + +public class InputStreamToFileWriter extends Thread{ + protected final Logger log = LoggerFactory.getLogger(this.getClass()); + + private BufferedReader in; + private BufferedWriter out; + + public InputStreamToFileWriter(InputStream in, String out) throws IOException { + this.in = new BufferedReader(new InputStreamReader(in)); + this.out = new BufferedWriter(new FileWriter(out)); + } + + public void run() { + try { + String line = null; + while ((line = in.readLine()) != null) { + if (log.isDebugEnabled()) { + log.debug(line); + } + out.write(line); + out.newLine(); + } + } catch (Exception e) { + log.error(e.getMessage(), e); + } finally { + if (in != null) { + try { + in.close(); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + if (out != null) { + try { + out.close(); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputUtils.java new file mode 100644 index 0000000..98671fd --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputUtils.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.local.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class InputUtils { + + private static final Logger logger = LoggerFactory.getLogger(InputUtils.class); + + private static final String SPACE = " "; + + private InputUtils() { + } + + public static String buildCommand(List<String> cmdList) { + StringBuffer buff = new StringBuffer(); + for (String string : cmdList) { + logger.debug("Build Command --> " + string); + buff.append(string); + buff.append(SPACE); + } + return buff.toString(); + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java new file mode 100644 index 0000000..2b45df7 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.local.utils; + +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.provider.GFacProviderException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + +public class LocalProviderUtil { + private static final Logger log = LoggerFactory.getLogger(LocalProviderUtil.class); + + private void makeFileSystemDir(String dir) throws GFacProviderException { + File f = new File(dir); + if (f.isDirectory() && f.exists()) { + return; + } else if (!new File(dir).mkdir()) { + throw new GFacProviderException("Cannot make directory " + dir); + } + } + + public void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacProviderException { + log.info("working diectroy = " + jobExecutionContext.getWorkingDir()); + log.info("temp directory = " + jobExecutionContext.getScratchLocation()); + makeFileSystemDir(jobExecutionContext.getWorkingDir()); + makeFileSystemDir(jobExecutionContext.getScratchLocation()); + makeFileSystemDir(jobExecutionContext.getInputDir()); + makeFileSystemDir(jobExecutionContext.getOutputDir()); + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java new file mode 100644 index 0000000..8eba250 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java @@ -0,0 +1,107 @@ +package org.apache.airavata.gfac.monitor;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ + +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.SecurityContext; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.monitor.MonitorID; +import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; +import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; +import org.apache.airavata.gfac.ssh.api.ServerInfo; +import org.apache.airavata.gfac.ssh.api.authentication.AuthenticationInfo; +import org.apache.airavata.gfac.ssh.impl.authentication.MyProxyAuthenticationInfo; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Timestamp; +import java.util.Date; + +public class HPCMonitorID extends MonitorID { + private final static Logger logger = LoggerFactory.getLogger(HPCMonitorID.class); + + + private AuthenticationInfo authenticationInfo = null; + + public HPCMonitorID(ComputeResourceDescription computeResourceDescription, String jobID, String taskID, String workflowNodeID, + String experimentID, String userName,String jobName) { + super(computeResourceDescription, jobID, taskID, workflowNodeID, experimentID, userName,jobName); + setComputeResourceDescription(computeResourceDescription); + setJobStartedTime(new Timestamp((new Date()).getTime())); + setUserName(userName); + setJobID(jobID); + setTaskID(taskID); + setExperimentID(experimentID); + setWorkflowNodeID(workflowNodeID); + } + + public HPCMonitorID(AuthenticationInfo authenticationInfo, JobExecutionContext jobExecutionContext) { + super(jobExecutionContext); + this.authenticationInfo = authenticationInfo; + if (this.authenticationInfo != null) { + try { + String hostAddress = jobExecutionContext.getHostName(); + SecurityContext securityContext = jobExecutionContext.getSecurityContext(hostAddress); + ServerInfo serverInfo = null; + if (securityContext != null) { + if (securityContext instanceof GSISecurityContext){ + serverInfo = (((GSISecurityContext) securityContext).getPbsCluster()).getServerInfo(); + if (serverInfo.getUserName() != null) { + setUserName(serverInfo.getUserName()); + } + } + if (securityContext instanceof SSHSecurityContext){ + serverInfo = (((SSHSecurityContext) securityContext).getPbsCluster()).getServerInfo(); + if (serverInfo.getUserName() != null) { + setUserName(serverInfo.getUserName()); + } + } + } + } catch (GFacException e) { + logger.error("Error while getting security context", e); + } + } + } + + public HPCMonitorID(ComputeResourceDescription computeResourceDescription, String jobID, String taskID, String workflowNodeID, String experimentID, String userName, AuthenticationInfo authenticationInfo) { + setComputeResourceDescription(computeResourceDescription); + setJobStartedTime(new Timestamp((new Date()).getTime())); + this.authenticationInfo = authenticationInfo; + // if we give myproxyauthenticationInfo, so we try to use myproxy user as the user + if (this.authenticationInfo != null) { + if (this.authenticationInfo instanceof MyProxyAuthenticationInfo) { + setUserName(((MyProxyAuthenticationInfo) this.authenticationInfo).getUserName()); + } + } + setJobID(jobID); + setTaskID(taskID); + setExperimentID(experimentID); + setWorkflowNodeID(workflowNodeID); + } + + public AuthenticationInfo getAuthenticationInfo() { + return authenticationInfo; + } + + public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) { + this.authenticationInfo = authenticationInfo; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java new file mode 100644 index 0000000..f29e3e6 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor; + +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.monitor.MonitorID; +import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; +import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; + +import java.util.ArrayList; +import java.util.List; + +public class HostMonitorData { +// private HostDescription host; + private ComputeResourceDescription computeResourceDescription; + private JobSubmissionProtocol jobSubmissionProtocol; + private DataMovementProtocol dataMovementProtocol; + + private List<MonitorID> monitorIDs; + + public HostMonitorData(JobExecutionContext jobExecutionContext) { + this.computeResourceDescription = jobExecutionContext.getApplicationContext().getComputeResourceDescription(); + this.jobSubmissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol(); + this.dataMovementProtocol = jobExecutionContext.getPreferredDataMovementProtocol(); + this.monitorIDs = new ArrayList<MonitorID>(); + } + + public HostMonitorData(JobExecutionContext jobExecutionContext, List<MonitorID> monitorIDs) { + this.computeResourceDescription = jobExecutionContext.getApplicationContext().getComputeResourceDescription(); + this.jobSubmissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol(); + this.dataMovementProtocol = jobExecutionContext.getPreferredDataMovementProtocol(); + this.monitorIDs = monitorIDs; + } + + public ComputeResourceDescription getComputeResourceDescription() { + return computeResourceDescription; + } + + public void setComputeResourceDescription(ComputeResourceDescription computeResourceDescription) { + this.computeResourceDescription = computeResourceDescription; + } + + public List<MonitorID> getMonitorIDs() { + return monitorIDs; + } + + public void setMonitorIDs(List<MonitorID> monitorIDs) { + this.monitorIDs = monitorIDs; + } + + /** + * this method get called by CommonUtils and it will check the right place before adding + * so there will not be a mismatch between this.host and monitorID.host + * @param monitorID + * @throws org.apache.airavata.gfac.monitor.exception.AiravataMonitorException + */ + public void addMonitorIDForHost(MonitorID monitorID)throws AiravataMonitorException { + monitorIDs.add(monitorID); + } + + public JobSubmissionProtocol getJobSubmissionProtocol() { + return jobSubmissionProtocol; + } + + public DataMovementProtocol getDataMovementProtocol() { + return dataMovementProtocol; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java new file mode 100644 index 0000000..022d17c --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor; + +import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * This is the datastructure to keep the user centric job data, rather keeping + * the individual jobs we keep the jobs based on the each user + */ +public class UserMonitorData { + private final static Logger logger = LoggerFactory.getLogger(UserMonitorData.class); + + private String userName; + + private List<HostMonitorData> hostMonitorData; + + + public UserMonitorData(String userName) { + this.userName = userName; + hostMonitorData = new ArrayList<HostMonitorData>(); + } + + public UserMonitorData(String userName, List<HostMonitorData> hostMonitorDataList) { + this.hostMonitorData = hostMonitorDataList; + this.userName = userName; + } + + public List<HostMonitorData> getHostMonitorData() { + return hostMonitorData; + } + + public void setHostMonitorData(List<HostMonitorData> hostMonitorData) { + this.hostMonitorData = hostMonitorData; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + /* + This method will add element to the MonitorID list, user should not + duplicate it, we do not check it because its going to be used by airavata + so we have to use carefully and this method will add a host if its a new host + */ + public void addHostMonitorData(HostMonitorData hostMonitorData) throws AiravataMonitorException { + this.hostMonitorData.add(hostMonitorData); + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java new file mode 100644 index 0000000..f19decf --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.gfac.monitor.command; + +public class ExperimentCancelRequest { + private String experimentId; + + public ExperimentCancelRequest(String experimentId) { + this.experimentId = experimentId; + } + + public String getExperimentId() { + return experimentId; + } + + public void setExperimentId(String experimentId) { + this.experimentId = experimentId; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java new file mode 100644 index 0000000..b45e01c --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.gfac.monitor.command; + +public class TaskCancelRequest { + private String experimentId; + private String nodeId; + private String taskId; + + public TaskCancelRequest(String experimentId, String nodeId, String taskId) { + this.experimentId = experimentId; + this.setNodeId(nodeId); + this.taskId = taskId; + } + public String getExperimentId() { + return experimentId; + } + public void setExperimentId(String experimentId) { + this.experimentId = experimentId; + } + public String getTaskId() { + return taskId; + } + public void setTaskId(String taskId) { + this.taskId = taskId; + } + public String getNodeId() { + return nodeId; + } + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java new file mode 100644 index 0000000..b4ac3a9 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.core; + +import org.apache.airavata.common.utils.MonitorPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is the abstract Monitor which needs to be used by + * any Monitoring implementation which expect nto consume + * to store the status to registry. Because they have to + * use the MonitorPublisher to publish the monitoring statuses + * to the Event Bus. All the Monitor statuses publish to the eventbus + * will be saved to the Registry. + */ +public abstract class AiravataAbstractMonitor implements Monitor { + private final static Logger logger = LoggerFactory.getLogger(AiravataAbstractMonitor.class); + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java new file mode 100644 index 0000000..a003f55 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.core; + +import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; +import org.apache.airavata.model.workspace.experiment.JobState; + +/** + * This is an interface to implement messageparser, it could be + * pull based or push based still monitor has to parse the content of + * the message it gets from remote monitoring system and finalize + * them to internal job state, Ex: JSON parser for AMQP and Qstat reader + * for pull based monitor. + */ +public interface MessageParser { + /** + * This method is to implement how to parse the incoming message + * and implement a logic to finalize the status of the job, + * we have to makesure the correct message is given to the messageparser + * parse method, it will not do any filtering + * @param message content of the message + * @return + */ + JobState parseMessage(String message)throws AiravataMonitorException; +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java new file mode 100644 index 0000000..614d606 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.core; + + +/** + * This is the primary interface for Monitors, + * This can be used to implement different methods of monitoring + */ +public interface Monitor { + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java new file mode 100644 index 0000000..efdf89c --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.core; + +import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; + +/** + * PullMonitors can implement this interface + * Since the pull and push based monitoring required different + * operations, PullMonitor will be useful. + * This will allow users to program Pull monitors separately + */ +public abstract class PullMonitor extends AiravataAbstractMonitor { + + private int pollingFrequence; + /** + * This method will can invoke when PullMonitor needs to start + * and it has to invoke in the frequency specified below, + * @return if the start process is successful return true else false + */ + public abstract boolean startPulling() throws AiravataMonitorException; + + /** + * This is the method to stop the polling process + * @return if the stopping process is successful return true else false + */ + public abstract boolean stopPulling()throws AiravataMonitorException; + + /** + * this method can be used to set the polling frequencey or otherwise + * can implement a polling mechanism, and implement how to do + * @param frequence + */ + public void setPollingFrequence(int frequence){ + this.pollingFrequence = frequence; + } + + /** + * this method can be used to get the polling frequencey or otherwise + * can implement a polling mechanism, and implement how to do + * @return + */ + public int getPollingFrequence(){ + return this.pollingFrequence; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java new file mode 100644 index 0000000..1b6a228 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.core; + +import org.apache.airavata.gfac.core.monitor.MonitorID; +import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; + +/** + * PushMonitors can implement this interface + * Since the pull and push based monitoring required different + * operations, PullMonitor will be useful. + * This interface will allow users to program Push monitors separately + */ +public abstract class PushMonitor extends AiravataAbstractMonitor { + /** + * This method can be invoked to register a listener with the + * remote monitoring system, ideally inside this method users will be + * writing some client listener code for the remote monitoring system, + * this will be a simple wrapper around any client for the remote Monitor. + * @param monitorID + * @return + */ + public abstract boolean registerListener(MonitorID monitorID)throws AiravataMonitorException; + + /** + * This method can be invoked to unregister a listener with the + * remote monitoring system, ideally inside this method users will be + * writing some client listener code for the remote monitoring system, + * this will be a simple wrapper around any client for the remote Monitor. + * @param monitorID + * @return + */ + public abstract boolean unRegisterListener(MonitorID monitorID)throws AiravataMonitorException; + + /** + * This can be used to stop the registration thread + * @return + * @throws org.apache.airavata.gfac.monitor.exception.AiravataMonitorException + */ + public abstract boolean stopRegister()throws AiravataMonitorException; + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java new file mode 100644 index 0000000..eea6ef6 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java @@ -0,0 +1,344 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.email; + +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.logger.AiravataLogger; +import org.apache.airavata.common.logger.AiravataLoggerFactory; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.GFacThreadPoolExecutor; +import org.apache.airavata.gfac.core.monitor.JobStatusResult; +import org.apache.airavata.gfac.core.monitor.EmailParser; +import org.apache.airavata.gfac.impl.OutHandlerWorker; +import org.apache.airavata.gfac.monitor.email.parser.LSFEmailParser; +import org.apache.airavata.gfac.monitor.email.parser.PBSEmailParser; +import org.apache.airavata.gfac.monitor.email.parser.SLURMEmailParser; +import org.apache.airavata.gfac.monitor.email.parser.UGEEmailParser; +import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType; +import org.apache.airavata.model.messaging.event.JobIdentifier; +import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.apache.airavata.model.workspace.experiment.JobStatus; + +import javax.mail.Address; +import javax.mail.Flags; +import javax.mail.Folder; +import javax.mail.Message; +import javax.mail.MessagingException; +import javax.mail.Session; +import javax.mail.Store; +import javax.mail.search.FlagTerm; +import javax.mail.search.SearchTerm; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +public class EmailBasedMonitor implements Runnable{ + private static final AiravataLogger log = AiravataLoggerFactory.getLogger(EmailBasedMonitor.class); + + public static final int COMPARISON = 6; // after and equal + public static final String IMAPS = "imaps"; + public static final String POP3 = "pop3"; + private boolean stopMonitoring = false; + + private Session session ; + private Store store; + private Folder emailFolder; + private Properties properties; + private Map<String, JobExecutionContext> jobMonitorMap = new ConcurrentHashMap<String, JobExecutionContext>(); + private String host, emailAddress, password, storeProtocol, folderName ; + private Date monitorStartDate; + private Map<ResourceJobManagerType, EmailParser> emailParserMap = new HashMap<ResourceJobManagerType, EmailParser>(); + + public EmailBasedMonitor(ResourceJobManagerType type) throws AiravataException { + init(); + } + + private void init() throws AiravataException { + host = ServerSettings.getEmailBasedMonitorHost(); + emailAddress = ServerSettings.getEmailBasedMonitorAddress(); + password = ServerSettings.getEmailBasedMonitorPassword(); + storeProtocol = ServerSettings.getEmailBasedMonitorStoreProtocol(); + folderName = ServerSettings.getEmailBasedMonitorFolderName(); + if (!(storeProtocol.equals(IMAPS) || storeProtocol.equals(POP3))) { + throw new AiravataException("Unsupported store protocol , expected " + + IMAPS + " or " + POP3 + " but found " + storeProtocol); + } + properties = new Properties(); + properties.put("mail.store.protocol", storeProtocol); + } + + public void addToJobMonitorMap(JobExecutionContext jobExecutionContext) { + String monitorId = jobExecutionContext.getJobDetails().getJobID(); + if (monitorId == null || monitorId.isEmpty()) { + monitorId = jobExecutionContext.getJobDetails().getJobName(); + } + addToJobMonitorMap(monitorId, jobExecutionContext); + } + + public void addToJobMonitorMap(String monitorId, JobExecutionContext jobExecutionContext) { + log.info("[EJM]: Added monitor Id : " + monitorId + " to email based monitor map"); + jobMonitorMap.put(monitorId, jobExecutionContext); + } + + private JobStatusResult parse(Message message) throws MessagingException, AiravataException { + Address fromAddress = message.getFrom()[0]; + String addressStr = fromAddress.toString(); + ResourceJobManagerType jobMonitorType = getJobMonitorType(addressStr); + EmailParser emailParser = emailParserMap.get(jobMonitorType); + if (emailParser == null) { + switch (jobMonitorType) { + case PBS: + emailParser = new PBSEmailParser(); + break; + case SLURM: + emailParser = new SLURMEmailParser(); + break; + case LSF: + emailParser = new LSFEmailParser(); + break; + case UGE: + emailParser = new UGEEmailParser(); + break; + default: + throw new AiravataException("[EJM]: Un-handle resource job manager type: " + jobMonitorType.toString() + " for email monitoring --> " + addressStr); + } + + emailParserMap.put(jobMonitorType, emailParser); + } + return emailParser.parseEmail(message); + } + + private ResourceJobManagerType getJobMonitorType(String addressStr) throws AiravataException { + System.out.println("*********** address ******** : " + addressStr); + switch (addressStr) { + case "[email protected]": // trestles , gordan + case "[email protected]": // bigred2 + case "root <[email protected]>": // bigred2 + case "root <[email protected]>": // alamo + return ResourceJobManagerType.PBS; + case "SDSC Admin <[email protected]>": // comet + case "[email protected]": // stampede + case "slurm user <[email protected]>": + return ResourceJobManagerType.SLURM; +// case "lsf": +// return ResourceJobManagerType.LSF; + default: + if (addressStr.contains("ls4.tacc.utexas.edu>")) { // lonestar + return ResourceJobManagerType.UGE; + } else { + throw new AiravataException("[EJM]: Couldn't identify Resource job manager type from address " + addressStr); + } + } + + } + + @Override + public void run() { + try { + session = Session.getDefaultInstance(properties); + store = session.getStore(storeProtocol); + store.connect(host, emailAddress, password); + emailFolder = store.getFolder(folderName); + // first time we search for all unread messages. + SearchTerm unseenBefore = new FlagTerm(new Flags(Flags.Flag.SEEN), false); + while (!(stopMonitoring || ServerSettings.isStopAllThreads())) { + Thread.sleep(ServerSettings.getEmailMonitorPeriod());// sleep a bit - get a rest till job finishes + if (jobMonitorMap.isEmpty()) { + log.info("[EJM]: Job Monitor Map is empty, no need to retrieve emails"); + continue; + } else { + log.info("[EJM]: " + jobMonitorMap.size() + " job/s in job monitor map"); + } + if (!store.isConnected()) { + store.connect(); + emailFolder = store.getFolder(folderName); + } + log.info("[EJM]: Retrieving unseen emails"); + emailFolder.open(Folder.READ_WRITE); + Message[] searchMessages = emailFolder.search(unseenBefore); + if (searchMessages == null || searchMessages.length == 0) { + log.info("[EJM]: No new email messages"); + } else { + log.info("[EJM]: "+searchMessages.length + " new email/s received"); + } + processMessages(searchMessages); + emailFolder.close(false); + } + } catch (MessagingException e) { + log.error("[EJM]: Couldn't connect to the store ", e); + } catch (InterruptedException e) { + log.error("[EJM]: Interrupt exception while sleep ", e); + } catch (AiravataException e) { + log.error("[EJM]: UnHandled arguments ", e); + } finally { + try { + emailFolder.close(false); + store.close(); + } catch (MessagingException e) { + log.error("[EJM]: Store close operation failed, couldn't close store", e); + } + } + } + + private void processMessages(Message[] searchMessages) throws MessagingException { + List<Message> processedMessages = new ArrayList<>(); + List<Message> unreadMessages = new ArrayList<>(); + for (Message message : searchMessages) { + try { + JobStatusResult jobStatusResult = parse(message); + JobExecutionContext jEC = jobMonitorMap.get(jobStatusResult.getJobId()); + if (jEC == null) { + jEC = jobMonitorMap.get(jobStatusResult.getJobName()); + } + if (jEC != null) { + process(jobStatusResult, jEC); + processedMessages.add(message); + } else { + // we can get JobExecutionContext null in multiple Gfac instances environment, + // where this job is not submitted by this Gfac instance hence we ignore this message. + unreadMessages.add(message); +// log.info("JobExecutionContext is not found for job Id " + jobStatusResult.getJobId()); + } + } catch (AiravataException e) { + log.error("[EJM]: Error parsing email message =====================================>", e); + try { + writeEnvelopeOnError(message); + } catch (MessagingException e1) { + log.error("[EJM]: Error printing envelop of the email"); + } + unreadMessages.add(message); + } catch (MessagingException e) { + log.error("[EJM]: Error while retrieving sender address from message : " + message.toString()); + unreadMessages.add(message); + } + } + if (!processedMessages.isEmpty()) { + Message[] seenMessages = new Message[processedMessages.size()]; + processedMessages.toArray(seenMessages); + try { + emailFolder.setFlags(seenMessages, new Flags(Flags.Flag.SEEN), true); + } catch (MessagingException e) { + if (!store.isConnected()) { + store.connect(); + emailFolder.setFlags(seenMessages, new Flags(Flags.Flag.SEEN), true); + } + } + + } + if (!unreadMessages.isEmpty()) { + Message[] unseenMessages = new Message[unreadMessages.size()]; + unreadMessages.toArray(unseenMessages); + try { + emailFolder.setFlags(unseenMessages, new Flags(Flags.Flag.SEEN), false); + } catch (MessagingException e) { + if (!store.isConnected()) { + store.connect(); + emailFolder.setFlags(unseenMessages, new Flags(Flags.Flag.SEEN), false); + + } + } + } + } + + private void process(JobStatusResult jobStatusResult, JobExecutionContext jEC){ + JobState resultState = jobStatusResult.getState(); + jEC.getJobDetails().setJobStatus(new JobStatus(resultState)); + boolean runOutHandlers = false; + String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", JobId : " + jobStatusResult.getJobId(); + // TODO - Handle all other valid JobStates + if (resultState == JobState.COMPLETE) { + jobMonitorMap.remove(jobStatusResult.getJobId()); + runOutHandlers = true; + log.info("[EJM]: Job Complete email received , removed job from job monitoring. " + jobDetails); + }else if (resultState == JobState.QUEUED) { + // nothing special thing to do, update the status change to rabbit mq at the end of this method. + log.info("[EJM]: Job Queued email received, " + jobDetails); + }else if (resultState == JobState.ACTIVE) { + // nothing special thing to do, update the status change to rabbit mq at the end of this method. + log.info("[EJM]: Job Active email received, " + jobDetails); + }else if (resultState == JobState.FAILED) { + jobMonitorMap.remove(jobStatusResult.getJobId()); + runOutHandlers = true; + log.info("[EJM]: Job failed email received , removed job from job monitoring. " + jobDetails); + }else if (resultState == JobState.CANCELED) { + jobMonitorMap.remove(jobStatusResult.getJobId()); + runOutHandlers = false; // Do we need to run out handlers in canceled case? + log.info("[EJM]: Job canceled mail received, removed job from job monitoring. " + jobDetails); + + } + log.info("[EJM]: Publishing status changes to amqp. " + jobDetails); + publishJobStatusChange(jEC); + + if (runOutHandlers) { + log.info("[EJM]: Calling Out Handler chain of " + jobDetails); + GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(jEC)); + } + } + + private void publishJobStatusChange(JobExecutionContext jobExecutionContext) { + JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent(); + JobIdentifier jobIdentity = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(), + jobExecutionContext.getTaskData().getTaskID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + jobExecutionContext.getExperimentID(), + jobExecutionContext.getGatewayID()); + jobStatus.setJobIdentity(jobIdentity); + jobStatus.setState(jobExecutionContext.getJobDetails().getJobStatus().getJobState()); + // we have this JobStatus class to handle amqp monitoring + log.debugId(jobStatus.getJobIdentity().getJobId(), "[EJM]: Published job status(" + + jobExecutionContext.getJobDetails().getJobStatus().getJobState().toString() + ") change request, " + + "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(), + jobStatus.getJobIdentity().getTaskId()); + + jobExecutionContext.getMonitorPublisher().publish(jobStatus); + } + + private void writeEnvelopeOnError(Message m) throws MessagingException { + Address[] a; + // FROM + if ((a = m.getFrom()) != null) { + for (int j = 0; j < a.length; j++) + log.error("FROM: " + a[j].toString()); + } + // TO + if ((a = m.getRecipients(Message.RecipientType.TO)) != null) { + for (int j = 0; j < a.length; j++) + log.error("TO: " + a[j].toString()); + } + // SUBJECT + if (m.getSubject() != null) + log.error("SUBJECT: " + m.getSubject()); + } + + public void stopMonitoring() { + stopMonitoring = true; + } + + public void setDate(Date date) { + this.monitorStartDate = date; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java new file mode 100644 index 0000000..3a75331 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.email; + +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType; + +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +public class EmailMonitorFactory { + + private static EmailBasedMonitor emailBasedMonitor; + private static Date startMonitorDate = Calendar.getInstance().getTime(); + + public static EmailBasedMonitor getEmailBasedMonitor(ResourceJobManagerType resourceJobManagerType) throws AiravataException { + if (emailBasedMonitor == null) { + synchronized (EmailMonitorFactory.class){ + if (emailBasedMonitor == null) { + emailBasedMonitor = new EmailBasedMonitor(resourceJobManagerType); + emailBasedMonitor.setDate(startMonitorDate); + new Thread(emailBasedMonitor).start(); + } + } + } + return emailBasedMonitor; + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java new file mode 100644 index 0000000..1b5a027 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.email.parser; + +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.gfac.core.monitor.EmailParser; +import org.apache.airavata.gfac.core.monitor.JobStatusResult; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.mail.Message; +import javax.mail.MessagingException; +import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class LSFEmailParser implements EmailParser { + private static final Logger log = LoggerFactory.getLogger(LSFEmailParser.class); + //[email protected] + private static final String SIGNAL = "signal"; + private static final String LONESTAR_REGEX = "Job (?<" + JOBID + ">\\d+) \\(.*\\) (?<" + STATUS + + ">.*)\\s[a-zA-Z =]+(?<" + EXIT_STATUS + ">\\d+)\\sSignal[ ]*=[ ]*(?<" + SIGNAL + ">[a-zA-z]*)"; + + @Override + public JobStatusResult parseEmail(Message message) throws MessagingException, AiravataException { + JobStatusResult jobStatusResult = new JobStatusResult(); + try { + String content = ((String) message.getContent()); + Pattern pattern = Pattern.compile(LONESTAR_REGEX); + Matcher matcher = pattern.matcher(content); + if (matcher.find()) { + jobStatusResult.setJobId(matcher.group(JOBID)); + String status = matcher.group(STATUS); + jobStatusResult.setState(getJobState(status, content)); + return jobStatusResult; + } else { + log.error("[EJM]: No matched found for content => \n" + content); + } + } catch (IOException e) { + throw new AiravataException("i[EJM]: Error while reading content of the email message"); + } + return jobStatusResult; + } + + private JobState getJobState(String status, String content) { + switch (status) { + case "Aborted": + return JobState.FAILED; + case "Success": + return JobState.COMPLETE; + default: + return JobState.UNKNOWN; + } + + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java new file mode 100644 index 0000000..4a3c88b --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.email.parser; + +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.gfac.core.monitor.EmailParser; +import org.apache.airavata.gfac.core.monitor.JobStatusResult; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.mail.Message; +import javax.mail.MessagingException; +import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class PBSEmailParser implements EmailParser { + + private static final Logger log = LoggerFactory.getLogger(PBSEmailParser.class); + + + private static final String REGEX = "[a-zA-Z ]*:[ ]*(?<" + JOBID + ">[a-zA-Z0-9-\\.]*)\\s+[a-zA-Z ]*:[ ]*(?<"+ + JOBNAME + ">[a-zA-Z0-9-\\.]*)\\s+.*\\s+(?<" + STATUS + ">[a-zA-Z\\ ]*)"; + private static final String REGEX_EXIT_STATUS = "Exit_status=(?<" + EXIT_STATUS + ">[\\d]+)"; + public static final String BEGUN_EXECUTION = "Begun execution"; + public static final String EXECUTION_TERMINATED = "Execution terminated"; + public static final String ABORTED_BY_PBS_SERVER = "Aborted by PBS Server"; + + @Override + public JobStatusResult parseEmail(Message message) throws MessagingException, AiravataException { + JobStatusResult jobStatusResult = new JobStatusResult(); +// log.info("Parsing -> " + message.getSubject()); + try { + String content = ((String) message.getContent()); + Pattern pattern = Pattern.compile(REGEX); + Matcher matcher = pattern.matcher(content); + if (matcher.find()) { + jobStatusResult.setJobId(matcher.group(JOBID)); + jobStatusResult.setJobName(matcher.group(JOBNAME)); + String statusLine = matcher.group(STATUS); + jobStatusResult.setState(getJobState(statusLine, content)); + return jobStatusResult; + } else { + log.error("[EJM]: No matched found for content => \n" + content); + } + + } catch (IOException e) { + throw new AiravataException("[EJM]: Error while reading content of the email message"); + } + return jobStatusResult; + } + + private JobState getJobState(String statusLine, String content) { + switch (statusLine) { + case BEGUN_EXECUTION: + return JobState.ACTIVE; + case EXECUTION_TERMINATED: + int exitStatus = getExitStatus(content); + if (exitStatus == 0) { + // TODO - Remove rabbitmq client script line from the script. + return JobState.COMPLETE; + } else if (exitStatus == 271) { + return JobState.CANCELED; + } else { + return JobState.FAILED; + } + case ABORTED_BY_PBS_SERVER: + return JobState.FAILED; + default: + return JobState.UNKNOWN; + } + } + + private int getExitStatus(String content) { + Pattern pattern = Pattern.compile(REGEX_EXIT_STATUS); + Matcher matcher = pattern.matcher(content); + if (matcher.find()) { + String group = matcher.group(EXIT_STATUS); + if (group != null && !group.trim().isEmpty()) { + return Integer.valueOf(group.trim()); + } + } + return -1; + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java new file mode 100644 index 0000000..9dd32c0 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.email.parser; + +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.gfac.core.monitor.EmailParser; +import org.apache.airavata.gfac.core.monitor.JobStatusResult; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.mail.Message; +import javax.mail.MessagingException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class SLURMEmailParser implements EmailParser { + + private static final Logger log = LoggerFactory.getLogger(SLURMEmailParser.class); + + private static final String REGEX = "[A-Z]*\\s[a-zA-Z]*_[a-z]*=(?<" + JOBID + ">\\d*)[ ]*[a-zA-Z]*=(?<"+ + JOBNAME + ">[a-zA-Z0-9-]*)[ ]*(?<" + STATUS + ">[]a-zA-Z]*),.*"; + + public static final String BEGAN = "Began"; + public static final String ENDED = "Ended"; + public static final String FAILED = "Failed"; + private static final Pattern cancelledStatePattern = Pattern.compile("CANCELLED"); + private static final Pattern pattern = Pattern.compile(REGEX); + + @Override + public JobStatusResult parseEmail(Message message) throws MessagingException, AiravataException{ + JobStatusResult jobStatusResult = new JobStatusResult(); + String subject = message.getSubject(); + Matcher matcher = pattern.matcher(subject); + if (matcher.find()) { + jobStatusResult.setJobId(matcher.group(JOBID)); + jobStatusResult.setJobName(matcher.group(JOBNAME)); + jobStatusResult.setState(getJobState(matcher.group(STATUS), subject)); + return jobStatusResult; + } else { + log.error("[EJM]: No matched found for subject -> " + subject); + } + return jobStatusResult; + } + + private JobState getJobState(String state, String subject) { + switch (state.trim()) { + case BEGAN: + return JobState.ACTIVE; + case ENDED: + Matcher matcher = cancelledStatePattern.matcher(subject); + if (matcher.find()) { + return JobState.CANCELED; + } + return JobState.COMPLETE; + case FAILED: + return JobState.FAILED; + default: + log.error("[EJM]: Job State " + state + " isn't handle by SLURM parser"); + return JobState.UNKNOWN; + + } + } + +}
