http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java new file mode 100644 index 0000000..9f369b1 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java @@ -0,0 +1,346 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.gsissh.provider.impl; + +import org.airavata.appcatalog.cpi.AppCatalogException; +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.handler.GFacHandlerException; +import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent; +import org.apache.airavata.gfac.core.provider.AbstractProvider; +import org.apache.airavata.gfac.core.provider.GFacProviderException; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; +import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils; +import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor; +import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory; +import org.apache.airavata.gfac.ssh.api.Cluster; +import org.apache.airavata.gfac.ssh.api.SSHApiException; +import org.apache.airavata.gfac.ssh.api.job.JobDescriptor; +import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; +import org.apache.airavata.model.appcatalog.computeresource.MonitorMode; +import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; +import org.apache.airavata.model.workspace.experiment.CorrectiveAction; +import org.apache.airavata.model.workspace.experiment.ErrorCategory; +import org.apache.airavata.model.workspace.experiment.JobDetails; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Map; + +//import org.apache.airavata.schemas.gfac.GsisshHostType; + +public class GSISSHProvider extends AbstractProvider { + private static final Logger log = LoggerFactory.getLogger(GSISSHProvider.class); + + public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException { + + } + + public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { + super.initialize(jobExecutionContext); + try { + String hostAddress = jobExecutionContext.getHostName(); + if (jobExecutionContext.getSecurityContext(hostAddress) == null) { + GFACGSISSHUtils.addSecurityContext(jobExecutionContext); + } + } catch (ApplicationSettingsException e) { + log.error(e.getMessage()); + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } catch (GFacException e) { + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } + } + + public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { + log.info("Invoking GSISSH Provider Invoke ..."); + StringBuffer data = new StringBuffer(); + jobExecutionContext.getNotifier().publish(new StartExecutionEvent()); + ComputeResourceDescription computeResourceDescription = jobExecutionContext.getApplicationContext() + .getComputeResourceDescription(); + ApplicationDeploymentDescription appDeployDesc = jobExecutionContext.getApplicationContext() + .getApplicationDeploymentDescription(); + JobDetails jobDetails = new JobDetails(); + Cluster cluster = null; + + try { + if (jobExecutionContext.getSecurityContext(jobExecutionContext.getHostName()) != null) { + cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(jobExecutionContext.getHostName())).getPbsCluster(); + } + if (cluster == null) { + throw new GFacProviderException("Security context is not set properly"); + } else { + log.info("Successfully retrieved the Security Context"); + } + // This installed path is a mandetory field, because this could change based on the computing resource + JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, cluster); + jobDetails.setJobName(jobDescriptor.getJobName()); + + log.info(jobDescriptor.toXML()); + data.append("jobDesc=").append(jobDescriptor.toXML()); + jobDetails.setJobDescription(jobDescriptor.toXML()); + String jobID = cluster.submitBatchJob(jobDescriptor); + jobExecutionContext.setJobDetails(jobDetails); + if (jobID == null) { + jobDetails.setJobID("none"); + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); + } else { + jobDetails.setJobID(jobID.split("\\.")[0]); + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED); + } + data.append(",jobId=").append(jobDetails.getJobID()); + + // Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler + // to perform monitoring, daemon handlers can be accessed from anywhere + monitor(jobExecutionContext); + // we know this host is type GsiSSHHostType + } catch (Exception e) { + String error = "Error submitting the job to host " + computeResourceDescription.getHostName() + " message: " + e.getMessage(); + log.error(error); + jobDetails.setJobID("none"); + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); + StringWriter errors = new StringWriter(); + e.printStackTrace(new PrintWriter(errors)); + GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + throw new GFacProviderException(error, e); + } finally { + log.info("Saving data for future recovery: "); + log.info(data.toString()); + GFacUtils.saveHandlerData(jobExecutionContext, data, this.getClass().getName()); + } + + } + + public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException { +/* List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers(); + if (daemonHandlers == null) { + daemonHandlers = BetterGfacImpl.getDaemonHandlers(); + } + ThreadedHandler pullMonitorHandler = null; + ThreadedHandler pushMonitorHandler = null; + MonitorMode monitorMode = sshJobSubmission.getMonitorMode(); + for (ThreadedHandler threadedHandler : daemonHandlers) { + if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) { + pullMonitorHandler = threadedHandler; + if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) { + jobExecutionContext.setProperty("cancel","true"); + pullMonitorHandler.invoke(jobExecutionContext); + } else { + log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" + + " to handle by the GridPullMonitorHandler"); + } + } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) { + pushMonitorHandler = threadedHandler; + if ( monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) { + pushMonitorHandler.invoke(jobExecutionContext); + } else { + log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" + + " to handle by the GridPushMonitorHandler"); + } + } + // have to handle the GridPushMonitorHandler logic + } + if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) { + log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" + + ", execution is configured as asynchronous, so Outhandler will not be invoked"); + }*/ + } + + public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException { + //To change body of implemented methods use File | Settings | File Templates. + log.info("canceling the job status in GSISSHProvider!!!!!"); + JobDetails jobDetails = jobExecutionContext.getJobDetails(); + String hostName = jobExecutionContext.getHostName(); + try { + Cluster cluster = null; + if (jobExecutionContext.getSecurityContext(hostName) == null) { + GFACGSISSHUtils.addSecurityContext(jobExecutionContext); + } + cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(hostName)).getPbsCluster(); + if (cluster == null) { + throw new GFacProviderException("Security context is not set properly"); + } else { + log.info("Successfully retrieved the Security Context"); + } + // This installed path is a mandetory field, because this could change based on the computing resource + if(jobDetails == null) { + log.error("There is not JobDetails so cancelations cannot perform !!!"); + return false; + } + if (jobDetails.getJobID() != null) { + // if this operation success without any exceptions, we can assume cancel operation succeeded. + cluster.cancelJob(jobDetails.getJobID()); + } else { + log.error("No Job Id is set, so cannot perform the cancel operation !!!"); + return false; + } + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED); + return true; + // we know this host is type GsiSSHHostType + } catch (SSHApiException e) { + String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage(); + log.error(error); + jobDetails.setJobID("none"); + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); + GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + throw new GFacProviderException(error, e); + } catch (Exception e) { + String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage(); + log.error(error); + jobDetails.setJobID("none"); + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); + GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + throw new GFacProviderException(error, e); + } + } + + public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException { + // have to implement the logic to recover a gfac failure + log.info("Invoking Recovering for the Experiment: " + jobExecutionContext.getExperimentID()); + ComputeResourceDescription computeResourceDescription = jobExecutionContext.getApplicationContext() + .getComputeResourceDescription(); + String hostName = jobExecutionContext.getHostName(); + String jobId = ""; + String jobDesc = ""; + try { + String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName()); + String[] split = pluginData.split(","); + if (split.length < 2) { + try { + this.execute(jobExecutionContext); + } catch (GFacException e) { + log.error("Error while recovering provider", e); + throw new GFacProviderException("Error recovering provider", e); + } + return; + } + jobDesc = split[0].substring(7); + jobId = split[1].substring(6); + + log.info("Following data have recovered: "); + log.info("Job Description: " + jobDesc); + log.info("Job Id: " + jobId); + if (jobId == null || "none".equals(jobId) || + "".equals(jobId)) { + try { + this.execute(jobExecutionContext); + } catch (GFacException e) { + log.error("Error while recovering provider", e); + throw new GFacProviderException("Error recovering provider", e); + } + return; + } + } catch (Exception e) { + log.error("Error while recovering provider", e); + } + try { + // Now we are we have enough data to recover + JobDetails jobDetails = new JobDetails(); + jobDetails.setJobDescription(jobDesc); + jobDetails.setJobID(jobId); + jobExecutionContext.setJobDetails(jobDetails); + if (jobExecutionContext.getSecurityContext(hostName) == null) { + try { + GFACGSISSHUtils.addSecurityContext(jobExecutionContext); + } catch (ApplicationSettingsException e) { + log.error(e.getMessage()); + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } + } + monitor(jobExecutionContext); + } catch (Exception e) { + log.error("Error while recover the job", e); + throw new GFacProviderException("Error delegating already ran job to Monitoring", e); + } + } + + @Override + public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { + String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId(); + SSHJobSubmission sshJobSubmission = null; + try { + sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId); + } catch (AppCatalogException e) { + throw new GFacException("Error while reading compute resource", e); + } + if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) { + MonitorMode monitorMode = sshJobSubmission.getMonitorMode(); + if (monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) { + try { + EmailBasedMonitor emailBasedMonitor = EmailMonitorFactory.getEmailBasedMonitor( + sshJobSubmission.getResourceJobManager().getResourceJobManagerType()); + emailBasedMonitor.addToJobMonitorMap(jobExecutionContext); + } catch (AiravataException e) { + throw new GFacHandlerException("Error while activating email job monitoring ", e); + } + return; + } + } +/* + // if email monitor is not activeated or not configure we use pull or push monitor + List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers(); + if (daemonHandlers == null) { + daemonHandlers = BetterGfacImpl.getDaemonHandlers(); + } + ThreadedHandler pullMonitorHandler = null; + ThreadedHandler pushMonitorHandler = null; + MonitorMode monitorMode = sshJobSubmission.getMonitorMode(); + String jobID = jobExecutionContext.getJobDetails().getJobID(); + for (ThreadedHandler threadedHandler : daemonHandlers) { + if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) { + pullMonitorHandler = threadedHandler; + if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) { + log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID); + pullMonitorHandler.invoke(jobExecutionContext); + } else { + log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" + + " to handle by the GridPullMonitorHandler"); + } + } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) { + pushMonitorHandler = threadedHandler; + if (monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) { + log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID); + pushMonitorHandler.invoke(jobExecutionContext); + } else { + log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" + + " to handle by the GridPushMonitorHandler"); + } + } + // have to handle the GridPushMonitorHandler logic + } + if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) { + log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" + + ", execution is configured as asynchronous, so Outhandler will not be invoked"); + + }*/ + } +}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java new file mode 100644 index 0000000..85e9e29 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.gsissh.security; + +import org.apache.airavata.credential.store.store.CredentialReader; +import org.apache.airavata.gfac.AbstractSecurityContext; +import org.apache.airavata.gfac.RequestData; +import org.apache.airavata.gfac.ssh.api.Cluster; +import org.globus.gsi.X509Credential; +import org.globus.gsi.gssapi.GlobusGSSCredentialImpl; +import org.globus.gsi.provider.GlobusProvider; +import org.globus.myproxy.GetParams; +import org.globus.myproxy.MyProxy; +import org.globus.myproxy.MyProxyException; +import org.gridforum.jgss.ExtendedGSSCredential; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles GRID related security. + */ +public class GSISecurityContext extends AbstractSecurityContext { + + protected static final Logger log = LoggerFactory.getLogger(GSISecurityContext.class); + /* + * context name + */ + + private Cluster pbsCluster = null; + + + public GSISecurityContext(CredentialReader credentialReader, RequestData requestData, Cluster pbsCluster) { + super(credentialReader, requestData); + this.pbsCluster = pbsCluster; + } + + + public GSISecurityContext(CredentialReader credentialReader, RequestData requestData) { + super(credentialReader, requestData); + } + + + public GSISecurityContext(Cluster pbsCluster) { + this.setPbsCluster(pbsCluster); + } + + + + public Cluster getPbsCluster() { + return pbsCluster; + } + + public void setPbsCluster(Cluster pbsCluster) { + this.pbsCluster = pbsCluster; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java new file mode 100644 index 0000000..a3e0241 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java @@ -0,0 +1,304 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.gsissh.security; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.credential.store.credential.Credential; +import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential; +import org.apache.airavata.credential.store.store.CredentialReader; +import org.apache.airavata.gfac.Constants; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.RequestData; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.core.authentication.GSIAuthenticationInfo; +import org.globus.gsi.X509Credential; +import org.globus.gsi.gssapi.GlobusGSSCredentialImpl; +import org.globus.gsi.provider.GlobusProvider; +import org.globus.myproxy.GetParams; +import org.globus.myproxy.MyProxy; +import org.globus.myproxy.MyProxyException; +import org.gridforum.jgss.ExtendedGSSCredential; +import org.ietf.jgss.GSSCredential; +import org.ietf.jgss.GSSException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.security.Security; +import java.security.cert.X509Certificate; + +public class TokenizedMyProxyAuthInfo extends GSIAuthenticationInfo { + protected static final Logger log = LoggerFactory.getLogger(TokenizedMyProxyAuthInfo.class); + + public static int CREDENTIAL_RENEWING_THRESH_HOLD = 10 * 90; + + private GSSCredential gssCredentials = null; + + + private CredentialReader credentialReader; + + private RequestData requestData; + + public static final String X509_CERT_DIR = "X509_CERT_DIR"; + + + static { + Security.addProvider(new GlobusProvider()); + try { + setUpTrustedCertificatePath(); + } catch (ApplicationSettingsException e) { + log.error(e.getLocalizedMessage(), e); + } + } + + public static void setUpTrustedCertificatePath(String trustedCertificatePath) { + + File file = new File(trustedCertificatePath); + + if (!file.exists() || !file.canRead()) { + File f = new File("."); + log.info("Current directory " + f.getAbsolutePath()); + throw new RuntimeException("Cannot read trusted certificate path " + trustedCertificatePath); + } else { + System.setProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY, file.getAbsolutePath()); + } + } + + private static void setUpTrustedCertificatePath() throws ApplicationSettingsException { + + String trustedCertificatePath = ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION); + + setUpTrustedCertificatePath(trustedCertificatePath); + } + + public TokenizedMyProxyAuthInfo(CredentialReader credentialReader, RequestData requestData) { + this.credentialReader = credentialReader; + this.requestData = requestData; + try { + properties.setProperty(X509_CERT_DIR, ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION)); + } catch (ApplicationSettingsException e) { + log.error("Error while reading server properties", e); + }; + } + + public TokenizedMyProxyAuthInfo(RequestData requestData) { + this.requestData = requestData; + try { + properties.setProperty(X509_CERT_DIR, ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION)); + } catch (ApplicationSettingsException e) { + log.error("Error while reading server properties", e); + }; + } + + public GSSCredential getCredentials() throws SecurityException { + + if (gssCredentials == null) { + + try { + gssCredentials = getCredentialsFromStore(); + } catch (Exception e) { + log.error("An exception occurred while retrieving credentials from the credential store. " + + "Will continue with my proxy user name and password. Provided TokenId:" + requestData.getTokenId(), e); + } + + if (gssCredentials == null) { + System.out.println("Authenticating with provided token failed, so falling back to authenticate with defaultCredentials"); + try { + gssCredentials = getDefaultCredentials(); + } catch (Exception e) { + throw new SecurityException("Error retrieving my proxy using username password"); + } + } + // if still null, throw an exception + if (gssCredentials == null) { + throw new SecurityException("Unable to retrieve my proxy credentials to continue operation."); + } + } else { + try { + if (gssCredentials.getRemainingLifetime() < CREDENTIAL_RENEWING_THRESH_HOLD) { + try { + return renewCredentials(); + } catch (Exception e) { + throw new SecurityException("Error renewing credentials", e); + } + } + } catch (GSSException e) { + throw new SecurityException("Unable to retrieve remaining life time from credentials.", e); + } + } + + return gssCredentials; + } + + + /** + * Reads the credentials from credential store. + * + * @return If token is found in the credential store, will return a valid credential. Else returns null. + * @throws Exception If an error occurred while retrieving credentials. + */ + public GSSCredential getCredentialsFromStore() throws Exception { + + if (getCredentialReader() == null) { + credentialReader = GFacUtils.getCredentialReader(); + if(credentialReader == null){ + return null; + } + } + + Credential credential = getCredentialReader().getCredential(getRequestData().getGatewayId(), + getRequestData().getTokenId()); + + if (credential != null) { + if (credential instanceof CertificateCredential) { + + log.info("Successfully found credentials for token id - " + getRequestData().getTokenId() + + " gateway id - " + getRequestData().getGatewayId()); + + CertificateCredential certificateCredential = (CertificateCredential) credential; + + X509Certificate[] certificates = certificateCredential.getCertificates(); + X509Credential newCredential = new X509Credential(certificateCredential.getPrivateKey(), certificates); + + GlobusGSSCredentialImpl cred = new GlobusGSSCredentialImpl(newCredential, GSSCredential.INITIATE_AND_ACCEPT); + System.out.print(cred.export(ExtendedGSSCredential.IMPEXP_OPAQUE)); + return cred; + //return new GlobusGSSCredentialImpl(newCredential, + // GSSCredential.INITIATE_AND_ACCEPT); + } else { + log.info("Credential type is not CertificateCredential. Cannot create mapping globus credentials. " + + "Credential type - " + credential.getClass().getName()); + } + } else { + log.info("Could not find credentials for token - " + getRequestData().getTokenId() + " and " + + "gateway id - " + getRequestData().getGatewayId()); + } + + return null; + } + + /** + * Renew GSSCredentials. + * Before executing we need to add current host as a trusted renewer. Note to renew credentials + * we dont need user name and password. + * To do that execute following command + * > myproxy-logon -t <LIFETIME></LIFETIME> -s <MY PROXY SERVER> -l <USER NAME> + * E.g :- > myproxy-logon -t 264 -s myproxy.teragrid.org -l us3 + * Enter MyProxy pass phrase: + * A credential has been received for user us3 in /tmp/x509up_u501. + * > myproxy-init -A --cert /tmp/x509up_u501 --key /tmp/x509up_u501 -l ogce -s myproxy.teragrid.org + * + * @return Renewed credentials. + * @throws org.apache.airavata.gfac.GFacException If an error occurred while renewing credentials. + * @throws org.apache.airavata.common.exception.ApplicationSettingsException + */ + public GSSCredential renewCredentialsAsATrustedHost() throws GFacException, ApplicationSettingsException { + MyProxy myproxy = new MyProxy(getRequestData().getMyProxyServerUrl(), getRequestData().getMyProxyPort()); + GetParams getParams = new GetParams(); + getParams.setAuthzCreds(gssCredentials); + getParams.setUserName(getRequestData().getMyProxyUserName()); + getParams.setLifetime(getRequestData().getMyProxyLifeTime()); + try { + return myproxy.get(gssCredentials, getParams); + } catch (MyProxyException e) { + throw new GFacException("An error occurred while renewing security credentials.", e); + } + } + + + /** + * Gets the default proxy certificate. + * + * @return Default my proxy credentials. + * @throws org.apache.airavata.gfac.GFacException If an error occurred while retrieving credentials. + * @throws org.apache.airavata.common.exception.ApplicationSettingsException + */ + public GSSCredential getDefaultCredentials() throws GFacException, ApplicationSettingsException { + MyProxy myproxy = new MyProxy(getRequestData().getMyProxyServerUrl(), getRequestData().getMyProxyPort()); + try { + return myproxy.get(getRequestData().getMyProxyUserName(), getRequestData().getMyProxyPassword(), + getRequestData().getMyProxyLifeTime()); + } catch (MyProxyException e) { + throw new GFacException("An error occurred while retrieving default security credentials.", e); + } + } + + + /** + * Renews credentials. First try to renew credentials as a trusted renewer. If that failed + * use user name and password to renew credentials. + * + * @return Renewed credentials. + * @throws org.apache.airavata.gfac.GFacException If an error occurred while renewing credentials. + * @throws org.apache.airavata.common.exception.ApplicationSettingsException + */ + public GSSCredential renewCredentials() throws GFacException, ApplicationSettingsException { + + // First try to renew credentials as a trusted renewer + try { + gssCredentials = renewCredentialsAsATrustedHost(); + } catch (Exception e) { + log.warn("Renewing credentials as a trusted renewer failed", e); + gssCredentials = getDefaultCredentials(); + } + + return gssCredentials; + } + + /** + * Gets a new proxy certificate given current credentials. + * + * @return The short lived GSSCredentials + * @throws org.apache.airavata.gfac.GFacException If an error is occurred while retrieving credentials. + * @throws org.apache.airavata.common.exception.ApplicationSettingsException + */ + public GSSCredential getProxyCredentials() throws GFacException, ApplicationSettingsException { + + MyProxy myproxy = new MyProxy(getRequestData().getMyProxyServerUrl(), getRequestData().getMyProxyPort()); + try { + return myproxy.get(gssCredentials, getRequestData().getMyProxyUserName(), getRequestData().getMyProxyPassword(), + getRequestData().getMyProxyLifeTime()); + } catch (MyProxyException e) { + throw new GFacException("An error occurred while renewing security credentials using user/password.", e); + } + } + + public void setGssCredentials(GSSCredential gssCredentials) { + this.gssCredentials = gssCredentials; + } + + public CredentialReader getCredentialReader() { + return credentialReader; + } + + public void setCredentialReader(CredentialReader credentialReader) { + this.credentialReader = credentialReader; + } + + public RequestData getRequestData() { + return requestData; + } + + public void setRequestData(RequestData requestData) { + this.requestData = requestData; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java new file mode 100644 index 0000000..c3978b1 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java @@ -0,0 +1,367 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.gsissh.util; + +import org.airavata.appcatalog.cpi.AppCatalog; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential; +import org.apache.airavata.credential.store.store.CredentialReader; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.RequestData; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.context.MessageContext; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; +import org.apache.airavata.gfac.gsissh.security.TokenizedMyProxyAuthInfo; +import org.apache.airavata.gfac.ssh.api.Cluster; +import org.apache.airavata.gfac.ssh.api.ServerInfo; +import org.apache.airavata.gfac.ssh.api.job.JobDescriptor; +import org.apache.airavata.gfac.ssh.api.job.JobManagerConfiguration; +import org.apache.airavata.gfac.ssh.impl.GSISSHAbstractCluster; +import org.apache.airavata.gfac.ssh.impl.PBSCluster; +import org.apache.airavata.gfac.ssh.util.CommonUtils; +import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; +import org.apache.airavata.model.appcatalog.appdeployment.ApplicationParallelismType; +import org.apache.airavata.model.appcatalog.appinterface.DataType; +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.model.appcatalog.computeresource.*; +import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; +import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling; +import org.apache.airavata.model.workspace.experiment.TaskDetails; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.*; + + +public class GFACGSISSHUtils { + private final static Logger logger = LoggerFactory.getLogger(GFACGSISSHUtils.class); + + public static final String PBS_JOB_MANAGER = "pbs"; + public static final String SLURM_JOB_MANAGER = "slurm"; + public static final String SUN_GRID_ENGINE_JOB_MANAGER = "UGE"; + public static final String LSF_JOB_MANAGER = "lsf"; + + public static int maxClusterCount = 5; + public static Map<String, List<Cluster>> clusters = new HashMap<String, List<Cluster>>(); + + public static void addSecurityContext(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException { + JobSubmissionInterface jobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface(); + JobSubmissionProtocol jobProtocol = jobSubmissionInterface.getJobSubmissionProtocol(); + try { + AppCatalog appCatalog = jobExecutionContext.getAppCatalog(); + SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId()); + if (jobProtocol == JobSubmissionProtocol.GLOBUS || jobProtocol == JobSubmissionProtocol.UNICORE + || jobProtocol == JobSubmissionProtocol.CLOUD || jobProtocol == JobSubmissionProtocol.LOCAL) { + logger.error("This is a wrong method to invoke to non ssh host types,please check your gfac-config.xml"); + } else if (jobProtocol == JobSubmissionProtocol.SSH && sshJobSubmission.getSecurityProtocol() == SecurityProtocol.GSI) { + String credentialStoreToken = jobExecutionContext.getCredentialStoreToken(); // this is set by the framework + RequestData requestData = new RequestData(jobExecutionContext.getGatewayID()); + requestData.setTokenId(credentialStoreToken); + PBSCluster pbsCluster = null; + GSISecurityContext context = null; + + TokenizedMyProxyAuthInfo tokenizedMyProxyAuthInfo = new TokenizedMyProxyAuthInfo(requestData); + CredentialReader credentialReader = GFacUtils.getCredentialReader(); + if (credentialReader != null) { + CertificateCredential credential = null; + try { + credential = (CertificateCredential) credentialReader.getCredential(jobExecutionContext.getGatewayID(), credentialStoreToken); + requestData.setMyProxyUserName(credential.getCommunityUser().getUserName()); + } catch (Exception e) { + logger.error(e.getLocalizedMessage()); + } + } + + String key = requestData.getMyProxyUserName() + jobExecutionContext.getHostName()+ + sshJobSubmission.getSshPort(); + boolean recreate = false; + synchronized (clusters) { + if (clusters.containsKey(key) && clusters.get(key).size() < maxClusterCount) { + recreate = true; + } else if (clusters.containsKey(key)) { + int i = new Random().nextInt(Integer.MAX_VALUE) % maxClusterCount; + if (clusters.get(key).get(i).getSession().isConnected()) { + pbsCluster = (PBSCluster) clusters.get(key).get(i); + } else { + clusters.get(key).remove(i); + recreate = true; + } + if (!recreate) { + try { + pbsCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate + } catch (Exception e) { + clusters.get(key).remove(i); + logger.info("Connection found the connection map is expired, so we create from the scratch"); + maxClusterCount++; + recreate = true; // we make the pbsCluster to create again if there is any exception druing connection + } + logger.info("Re-using the same connection used with the connection string:" + key); + context = new GSISecurityContext(tokenizedMyProxyAuthInfo.getCredentialReader(), requestData, pbsCluster); + } + } else { + recreate = true; + } + + if (recreate) { + ServerInfo serverInfo = new ServerInfo(requestData.getMyProxyUserName(), jobExecutionContext.getHostName(), + sshJobSubmission.getSshPort()); + + JobManagerConfiguration jConfig = null; + String installedParentPath = sshJobSubmission.getResourceJobManager().getJobManagerBinPath(); + String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString(); + if (jobManager == null) { + logger.error("No Job Manager is configured, so we are picking pbs as the default job manager"); + jConfig = CommonUtils.getPBSJobManager(installedParentPath); + } else { + if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getPBSJobManager(installedParentPath); + } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getSLURMJobManager(installedParentPath); + } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getUGEJobManager(installedParentPath); + }else if(LSF_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getLSFJobManager(installedParentPath); + } + } + pbsCluster = new PBSCluster(serverInfo, tokenizedMyProxyAuthInfo, jConfig); + context = new GSISecurityContext(tokenizedMyProxyAuthInfo.getCredentialReader(), requestData, pbsCluster); + List<Cluster> pbsClusters = null; + if (!(clusters.containsKey(key))) { + pbsClusters = new ArrayList<Cluster>(); + } else { + pbsClusters = clusters.get(key); + } + pbsClusters.add(pbsCluster); + clusters.put(key, pbsClusters); + } + } + + jobExecutionContext.addSecurityContext(jobExecutionContext.getHostName(), context); + } + } catch (Exception e) { + throw new GFacException("An error occurred while creating GSI security context", e); + } + } + + public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext, Cluster cluster) { + JobDescriptor jobDescriptor = new JobDescriptor(); + TaskDetails taskData = jobExecutionContext.getTaskData(); + ResourceJobManager resourceJobManager = jobExecutionContext.getResourceJobManager(); + try { + if(ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")){ + jobDescriptor.setMailOptions(ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_FLAGS)); + String emailids = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS); + + if(jobExecutionContext.getTaskData().isSetEmailAddresses()){ + List<String> emailList = jobExecutionContext.getTaskData().getEmailAddresses(); + String elist = GFacUtils.listToCsv(emailList, ','); + if(emailids != null && !emailids.isEmpty()){ + emailids = emailids +"," + elist; + }else{ + emailids = elist; + } + } + if(emailids != null && !emailids.isEmpty()){ + logger.info("Email list: "+ emailids); + jobDescriptor.setMailAddress(emailids); + } + } + } catch (ApplicationSettingsException e) { + logger.error("ApplicationSettingsException : " +e.getLocalizedMessage()); + } + // this is common for any application descriptor + jobDescriptor.setCallBackIp(ServerSettings.getIp()); + jobDescriptor.setCallBackPort(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.GFAC_SERVER_PORT, "8950")); + jobDescriptor.setInputDirectory(jobExecutionContext.getInputDir()); + jobDescriptor.setOutputDirectory(jobExecutionContext.getOutputDir()); + jobDescriptor.setExecutablePath(jobExecutionContext.getExecutablePath()); + jobDescriptor.setStandardOutFile(jobExecutionContext.getStandardOutput()); + jobDescriptor.setStandardErrorFile(jobExecutionContext.getStandardError()); + String computationalProjectAccount = taskData.getTaskScheduling().getComputationalProjectAccount(); + taskData.getEmailAddresses(); + if (computationalProjectAccount == null){ + ComputeResourcePreference computeResourcePreference = jobExecutionContext.getApplicationContext().getComputeResourcePreference(); + if (computeResourcePreference != null) { + computationalProjectAccount = computeResourcePreference.getAllocationProjectNumber(); + } + } + if (computationalProjectAccount != null) { + jobDescriptor.setAcountString(computationalProjectAccount); + } + + Random random = new Random(); + int i = random.nextInt(Integer.MAX_VALUE); // We always set the job name + jobDescriptor.setJobName("A" + String.valueOf(i+99999999)); + jobDescriptor.setWorkingDirectory(jobExecutionContext.getWorkingDir()); + + List<String> inputValues = new ArrayList<String>(); + MessageContext input = jobExecutionContext.getInMessageContext(); + // sort the inputs first and then build the command List + Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() { + @Override + public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) { + return inputDataObjectType.getInputOrder() - t1.getInputOrder(); + } + }; + Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator); + for (Object object : input.getParameters().values()) { + if (object instanceof InputDataObjectType) { + InputDataObjectType inputDOT = (InputDataObjectType) object; + sortedInputSet.add(inputDOT); + } + } + for (InputDataObjectType inputDataObjectType : sortedInputSet) { + if (!inputDataObjectType.isRequiredToAddedToCommandLine()) { + continue; + } + if (inputDataObjectType.getApplicationArgument() != null + && !inputDataObjectType.getApplicationArgument().equals("")) { + inputValues.add(inputDataObjectType.getApplicationArgument()); + } + + if (inputDataObjectType.getValue() != null + && !inputDataObjectType.getValue().equals("")) { + if (inputDataObjectType.getType() == DataType.URI) { + // set only the relative path + String filePath = inputDataObjectType.getValue(); + filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length()); + inputValues.add(filePath); + }else { + inputValues.add(inputDataObjectType.getValue()); + } + + } + } + + Map<String, Object> outputParams = jobExecutionContext.getOutMessageContext().getParameters(); + for (Object outputParam : outputParams.values()) { + if (outputParam instanceof OutputDataObjectType) { + OutputDataObjectType output = (OutputDataObjectType) outputParam; + if (output.getApplicationArgument() != null + && !output.getApplicationArgument().equals("")) { + inputValues.add(output.getApplicationArgument()); + } + if (output.getValue() != null && !output.getValue().equals("") && output.isRequiredToAddedToCommandLine()) { + if (output.getType() == DataType.URI){ + String filePath = output.getValue(); + filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length()); + inputValues.add(filePath); + } + } + } + } + jobDescriptor.setInputValues(inputValues); + + jobDescriptor.setUserName(((GSISSHAbstractCluster) cluster).getServerInfo().getUserName()); + jobDescriptor.setShellName("/bin/bash"); + jobDescriptor.setAllEnvExport(true); + jobDescriptor.setOwner(((PBSCluster) cluster).getServerInfo().getUserName()); + + ComputationalResourceScheduling taskScheduling = taskData.getTaskScheduling(); + if (taskScheduling != null) { + int totalNodeCount = taskScheduling.getNodeCount(); + int totalCPUCount = taskScheduling.getTotalCPUCount(); + +// jobDescriptor.setJobSubmitter(applicationDeploymentType.getJobSubmitterCommand()); + if (taskScheduling.getComputationalProjectAccount() != null) { + jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount()); + } + if (taskScheduling.getQueueName() != null) { + jobDescriptor.setQueueName(taskScheduling.getQueueName()); + } + + if (totalNodeCount > 0) { + jobDescriptor.setNodes(totalNodeCount); + } + if (taskScheduling.getComputationalProjectAccount() != null) { + jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount()); + } + if (taskScheduling.getQueueName() != null) { + jobDescriptor.setQueueName(taskScheduling.getQueueName()); + } + if (totalCPUCount > 0) { + int ppn = totalCPUCount / totalNodeCount; + jobDescriptor.setProcessesPerNode(ppn); + jobDescriptor.setCPUCount(totalCPUCount); + } + if (taskScheduling.getWallTimeLimit() > 0) { + jobDescriptor.setMaxWallTime(String.valueOf(taskScheduling.getWallTimeLimit())); + if(resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)){ + jobDescriptor.setMaxWallTimeForLSF(String.valueOf(taskScheduling.getWallTimeLimit())); + } + } + + if (taskScheduling.getTotalPhysicalMemory() > 0) { + jobDescriptor.setUsedMemory(taskScheduling.getTotalPhysicalMemory() + ""); + } + } else { + logger.error("Task scheduling cannot be null at this point.."); + } + + ApplicationDeploymentDescription appDepDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription(); + List<String> moduleCmds = appDepDescription.getModuleLoadCmds(); + if (moduleCmds != null) { + for (String moduleCmd : moduleCmds) { + jobDescriptor.addModuleLoadCommands(moduleCmd); + } + } + List<String> preJobCommands = appDepDescription.getPreJobCommands(); + if (preJobCommands != null) { + for (String preJobCommand : preJobCommands) { + jobDescriptor.addPreJobCommand(parseCommand(preJobCommand, jobExecutionContext)); + } + } + + List<String> postJobCommands = appDepDescription.getPostJobCommands(); + if (postJobCommands != null) { + for (String postJobCommand : postJobCommands) { + jobDescriptor.addPostJobCommand(parseCommand(postJobCommand, jobExecutionContext)); + } + } + + ApplicationParallelismType parallelism = appDepDescription.getParallelism(); + if (parallelism != null){ + if (parallelism == ApplicationParallelismType.MPI || parallelism == ApplicationParallelismType.OPENMP || parallelism == ApplicationParallelismType.OPENMP_MPI){ + Map<JobManagerCommand, String> jobManagerCommands = resourceJobManager.getJobManagerCommands(); + if (jobManagerCommands != null && !jobManagerCommands.isEmpty()) { + for (JobManagerCommand command : jobManagerCommands.keySet()) { + if (command == JobManagerCommand.SUBMISSION) { + String commandVal = jobManagerCommands.get(command); + jobDescriptor.setJobSubmitter(commandVal); + } + } + } + } + } + return jobDescriptor; + } + + private static String parseCommand(String value, JobExecutionContext jobExecutionContext) { + String parsedValue = value.replaceAll("\\$workingDir", jobExecutionContext.getWorkingDir()); + parsedValue = parsedValue.replaceAll("\\$inputDir", jobExecutionContext.getInputDir()); + parsedValue = parsedValue.replaceAll("\\$outputDir", jobExecutionContext.getOutputDir()); + return parsedValue; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java new file mode 100644 index 0000000..e7c6572 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java @@ -0,0 +1,120 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.impl; + +import com.google.common.eventbus.Subscribe; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.common.utils.listener.AbstractActivityListener; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; +import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent; +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.airavata.model.workspace.experiment.JobDetails; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.apache.airavata.registry.cpi.CompositeIdentifier; +import org.apache.airavata.registry.cpi.Registry; +import org.apache.airavata.registry.cpi.RegistryModelType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Calendar; + +public class AiravataJobStatusUpdator implements AbstractActivityListener { + private final static Logger logger = LoggerFactory.getLogger(AiravataJobStatusUpdator.class); + private Registry airavataRegistry; + + private MonitorPublisher monitorPublisher; + private Publisher publisher; + + + public Registry getAiravataRegistry() { + return airavataRegistry; + } + + public void setAiravataRegistry(Registry airavataRegistry) { + this.airavataRegistry = airavataRegistry; + } + + + @Subscribe + public void updateRegistry(JobStatusChangeRequestEvent jobStatus) throws Exception{ + /* Here we need to parse the jobStatus message and update + the registry accordingly, for now we are just printing to standard Out + */ + JobState state = jobStatus.getState(); + if (state != null) { + try { + String taskID = jobStatus.getJobIdentity().getTaskId(); + String jobID = jobStatus.getJobIdentity().getJobId(); + String expId = jobStatus.getJobIdentity().getExperimentId(); + updateJobStatus(expId,taskID, jobID, state); + logger.debug("expId - {}: Publishing job status for " + jobStatus.getJobIdentity().getJobId() + ":" + + state.toString(),jobStatus.getJobIdentity().getExperimentId()); + JobStatusChangeEvent event = new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity()); + monitorPublisher.publish(event); + String messageId = AiravataUtils.getId("JOB"); + MessageContext msgCntxt = new MessageContext(event, MessageType.JOB, messageId, jobStatus.getJobIdentity().getGatewayId()); + msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + publisher.publish(msgCntxt); + } catch (Exception e) { + logger.error("expId - " + jobStatus.getJobIdentity().getExperimentId() + ": Error persisting data" + + e.getLocalizedMessage(), e); + throw new Exception("Error persisting job status..", e); + } + } + } + + public void updateJobStatus(String expId, String taskId, String jobID, JobState state) throws Exception { + logger.info("expId - {}: Updating job status for " + jobID + ":" + state.toString(), expId); + CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID); + JobDetails details = (JobDetails) airavataRegistry.get(RegistryModelType.JOB_DETAIL, ids); + if (details == null) { + details = new JobDetails(); + } + org.apache.airavata.model.workspace.experiment.JobStatus status = new org.apache.airavata.model.workspace.experiment.JobStatus(); + if (JobState.CANCELED.equals(details.getJobStatus().getJobState()) || + JobState.CANCELING.equals(details.getJobStatus().getJobState())) { + status.setJobState(details.getJobStatus().getJobState()); + } else { + status.setJobState(state); + } + status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); + details.setJobStatus(status); + details.setJobID(jobID); + logger.debug("expId - {}: Updated job status for " + jobID + ":" + details.getJobStatus().toString(), expId); + airavataRegistry.update(RegistryModelType.JOB_STATUS, status, ids); + } + + @SuppressWarnings("unchecked") + public void setup(Object... configurations) { + for (Object configuration : configurations) { + if (configuration instanceof Registry){ + this.airavataRegistry=(Registry)configuration; + } else if (configuration instanceof MonitorPublisher){ + this.monitorPublisher=(MonitorPublisher) configuration; + } else if (configuration instanceof Publisher){ + this.publisher=(Publisher) configuration; + } + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java new file mode 100644 index 0000000..94029be --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java @@ -0,0 +1,166 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.impl; + +import com.google.common.eventbus.Subscribe; +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.common.utils.listener.AbstractActivityListener; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.airavata.model.messaging.event.TaskIdentifier; +import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent; +import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; +import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent; +import org.apache.airavata.model.workspace.experiment.TaskDetails; +import org.apache.airavata.model.workspace.experiment.TaskState; +import org.apache.airavata.registry.cpi.Registry; +import org.apache.airavata.registry.cpi.RegistryModelType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Calendar; + +public class AiravataTaskStatusUpdator implements AbstractActivityListener { + private final static Logger logger = LoggerFactory.getLogger(AiravataTaskStatusUpdator.class); + private Registry airavataRegistry; + private MonitorPublisher monitorPublisher; + private Publisher publisher; + + public Registry getAiravataRegistry() { + return airavataRegistry; + } + + public void setAiravataRegistry(Registry airavataRegistry) { + this.airavataRegistry = airavataRegistry; + } + + @Subscribe + public void setupTaskStatus(TaskStatusChangeRequestEvent taskStatus) throws Exception{ + try { + updateTaskStatus(taskStatus.getTaskIdentity().getTaskId(), taskStatus.getState()); + logger.debug("expId - {}: Publishing task status for " + taskStatus.getTaskIdentity().getTaskId() + ":" + + taskStatus.getState().toString(), taskStatus.getTaskIdentity().getExperimentId()); + TaskStatusChangeEvent event = new TaskStatusChangeEvent(taskStatus.getState(), taskStatus.getTaskIdentity()); + monitorPublisher.publish(event); + String messageId = AiravataUtils.getId("TASK"); + MessageContext msgCntxt = new MessageContext(event, MessageType.TASK, messageId, taskStatus.getTaskIdentity().getGatewayId()); + msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + publisher.publish(msgCntxt); + } catch (Exception e) { + String msg = "Error persisting data task status to database..."; + logger.error(msg + e.getLocalizedMessage(), e); + throw new Exception(msg, e); + } + } + + @Subscribe + public void setupTaskStatus(JobStatusChangeEvent jobStatus) throws Exception{ + TaskState state=TaskState.UNKNOWN; + switch(jobStatus.getState()){ + case ACTIVE: + state=TaskState.EXECUTING; break; + case CANCELED: + state=TaskState.CANCELED; break; + case COMPLETE: case FAILED: + state=TaskState.POST_PROCESSING; break; + case HELD: case SUSPENDED: case QUEUED: + state=TaskState.WAITING; break; + case SETUP: + state=TaskState.PRE_PROCESSING; break; + case SUBMITTED: + state=TaskState.STARTED; break; + case UN_SUBMITTED: + state=TaskState.CANCELED; break; + case CANCELING: + state=TaskState.CANCELING; break; + default: + return; + } + try { + updateTaskStatus(jobStatus.getJobIdentity().getTaskId(), state); + logger.debug("expId - {}: Publishing task status for " + jobStatus.getJobIdentity().getTaskId() + ":" + + state.toString(), jobStatus.getJobIdentity().getExperimentId()); + TaskIdentifier taskIdentity = new TaskIdentifier(jobStatus.getJobIdentity().getTaskId(), + jobStatus.getJobIdentity().getWorkflowNodeId(), + jobStatus.getJobIdentity().getExperimentId(), + jobStatus.getJobIdentity().getGatewayId()); + TaskStatusChangeEvent event = new TaskStatusChangeEvent(state, taskIdentity); + monitorPublisher.publish(event); + String messageId = AiravataUtils.getId("TASK"); + MessageContext msgCntxt = new MessageContext(event, MessageType.TASK, messageId,jobStatus.getJobIdentity().getGatewayId()); + msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + publisher.publish(msgCntxt); + + } catch (Exception e) { + logger.error("expId - " + jobStatus.getJobIdentity().getExperimentId() + ": Error persisting data" + e.getLocalizedMessage(), e); + throw new Exception("Error persisting task status..", e); + } + } + + public TaskState updateTaskStatus(String taskId, TaskState state) throws Exception { + TaskDetails details = (TaskDetails)airavataRegistry.get(RegistryModelType.TASK_DETAIL, taskId); + if(details == null) { + logger.error("Task details cannot be null at this point"); + throw new Exception("Task details cannot be null at this point"); + } + org.apache.airavata.model.workspace.experiment.TaskStatus status = new org.apache.airavata.model.workspace.experiment.TaskStatus(); + if(!TaskState.CANCELED.equals(details.getTaskStatus().getExecutionState()) + && !TaskState.CANCELING.equals(details.getTaskStatus().getExecutionState())){ + status.setExecutionState(state); + }else{ + status.setExecutionState(details.getTaskStatus().getExecutionState()); + } + status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); + details.setTaskStatus(status); + logger.debug("Updating task status for "+taskId+":"+details.getTaskStatus().toString()); + + airavataRegistry.update(RegistryModelType.TASK_STATUS, status, taskId); + return status.getExecutionState(); + } + + public void setup(Object... configurations) { + for (Object configuration : configurations) { + if (configuration instanceof Registry){ + this.airavataRegistry=(Registry)configuration; + } else if (configuration instanceof MonitorPublisher){ + this.monitorPublisher=(MonitorPublisher) configuration; + } else if (configuration instanceof Publisher){ + this.publisher=(Publisher) configuration; + } + } + } + + + @Subscribe + public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent) throws AiravataException { + String taskId = taskOutputEvent.getTaskIdentity().getTaskId(); + logger.debug("Task Output changed event received for workflow node : " + + taskOutputEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId); + // TODO - do we need to update the output to the registry? , we do it in the workflowInterpreter too. + MessageContext messageContext = new MessageContext(taskOutputEvent, MessageType.TASKOUTPUT, taskOutputEvent.getTaskIdentity().getTaskId(), taskOutputEvent.getTaskIdentity().getGatewayId()); + messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + publisher.publish(messageContext); + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java new file mode 100644 index 0000000..092774b --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java @@ -0,0 +1,129 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.impl; + +import com.google.common.eventbus.Subscribe; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.common.utils.listener.AbstractActivityListener; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; +import org.apache.airavata.model.messaging.event.WorkflowIdentifier; +import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeState; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus; +import org.apache.airavata.registry.cpi.Registry; +import org.apache.airavata.registry.cpi.RegistryModelType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Calendar; + +public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListener { + private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class); + + private Registry airavataRegistry; + private MonitorPublisher monitorPublisher; + private Publisher publisher; + + + + + public Registry getAiravataRegistry() { + return airavataRegistry; + } + + public void setAiravataRegistry(Registry airavataRegistry) { + this.airavataRegistry = airavataRegistry; + } + + @Subscribe + public void setupWorkflowNodeStatus(TaskStatusChangeEvent taskStatus) throws Exception{ + WorkflowNodeState state=WorkflowNodeState.UNKNOWN; + switch(taskStatus.getState()){ + case CANCELED: + state=WorkflowNodeState.CANCELED; break; + case COMPLETED: + state=WorkflowNodeState.COMPLETED; break; + case CONFIGURING_WORKSPACE: + state=WorkflowNodeState.INVOKED; break; + case FAILED: + state=WorkflowNodeState.FAILED; break; + case EXECUTING: case WAITING: case PRE_PROCESSING: case POST_PROCESSING: case OUTPUT_DATA_STAGING: case INPUT_DATA_STAGING: + state=WorkflowNodeState.EXECUTING; break; + case STARTED: + state=WorkflowNodeState.INVOKED; break; + case CANCELING: + state=WorkflowNodeState.CANCELING; break; + default: + return; + } + try { + String expId = taskStatus.getTaskIdentity().getExperimentId(); + updateWorkflowNodeStatus(expId, taskStatus.getTaskIdentity().getWorkflowNodeId(), state); + logger.debug("expId - {}: Publishing workflow node status for " + taskStatus.getTaskIdentity().getWorkflowNodeId() + + ":" + state.toString(), taskStatus.getTaskIdentity().getExperimentId()); + WorkflowIdentifier workflowIdentity = new WorkflowIdentifier(taskStatus.getTaskIdentity().getWorkflowNodeId(), + taskStatus.getTaskIdentity().getExperimentId(), + taskStatus.getTaskIdentity().getGatewayId()); + WorkflowNodeStatusChangeEvent event = new WorkflowNodeStatusChangeEvent(state, workflowIdentity); + monitorPublisher.publish(event); + String messageId = AiravataUtils.getId("WFNODE"); + MessageContext msgCntxt = new MessageContext(event, MessageType.WORKFLOWNODE, messageId, taskStatus.getTaskIdentity().getGatewayId()); + msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + + publisher.publish(msgCntxt); + } catch (Exception e) { + logger.error("expId - " + taskStatus.getTaskIdentity().getExperimentId() + ": Error persisting data" + + e.getLocalizedMessage(), e); + throw new Exception("Error persisting workflow node status..", e); + } + } + + public void updateWorkflowNodeStatus(String experimentId, String workflowNodeId, WorkflowNodeState state) throws Exception { + logger.info("expId - {}: Updating workflow node status for "+workflowNodeId+":"+state.toString(), experimentId); + WorkflowNodeDetails details = (WorkflowNodeDetails)airavataRegistry.get(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeId); + if(details == null) { + details = new WorkflowNodeDetails(); + details.setNodeInstanceId(workflowNodeId); + } + WorkflowNodeStatus status = new WorkflowNodeStatus(); + status.setWorkflowNodeState(state); + status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); + details.setWorkflowNodeStatus(status); + airavataRegistry.update(RegistryModelType.WORKFLOW_NODE_STATUS, status, workflowNodeId); + } + + public void setup(Object... configurations) { + for (Object configuration : configurations) { + if (configuration instanceof Registry){ + this.airavataRegistry=(Registry)configuration; + } else if (configuration instanceof MonitorPublisher){ + this.monitorPublisher=(MonitorPublisher) configuration; + } else if (configuration instanceof Publisher){ + this.publisher=(Publisher) configuration; + } + } + } +}
