Repository: airavata Updated Branches: refs/heads/master 34a8147e4 -> b6c7c41e3
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHOutputHandler.java new file mode 100644 index 0000000..a95f463 --- /dev/null +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHOutputHandler.java @@ -0,0 +1,218 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.handler; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import net.schmizz.sshj.connection.ConnectionException; +import net.schmizz.sshj.transport.TransportException; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.Constants; +import org.apache.airavata.commons.gfac.type.ActualParameter; +import org.apache.airavata.commons.gfac.type.ApplicationDescription; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.context.JobExecutionContext; +import org.apache.airavata.gfac.context.security.GSISecurityContext; +import org.apache.airavata.gfac.context.security.SSHSecurityContext; +import org.apache.airavata.gfac.provider.GFacProviderException; +import org.apache.airavata.gfac.util.GFACSSHUtils; +import org.apache.airavata.gfac.utils.GFacUtils; +import org.apache.airavata.gfac.utils.OutputUtils; +import org.apache.airavata.gsi.ssh.api.Cluster; +import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; +import org.apache.airavata.model.workspace.experiment.*; +import org.apache.airavata.registry.cpi.ChildDataType; +import org.apache.airavata.registry.cpi.DataType; +import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType; +import org.apache.airavata.schemas.gfac.GsisshHostType; +import org.apache.airavata.schemas.gfac.URIParameterType; +import org.apache.xmlbeans.XmlException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SSHOutputHandler extends AbstractHandler{ + private static final Logger log = LoggerFactory.getLogger(SSHOutputHandler.class); + + public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException { + if (jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof GsisshHostType) { // this is because we don't have the right jobexecution context + // so attempting to get it from the registry + if (Constants.PUSH.equals(((GsisshHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getMonitorMode())) { // this is because we don't have the right jobexecution context + // so attempting to get it from the registry + log.warn("During the out handler chain jobExecution context came null, so trying to handler"); + ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription(); + TaskDetails taskData = null; + try { + taskData = (TaskDetails) registry.get(DataType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID()); + } catch (RegistryException e) { + log.error("Error retrieving job details from Registry"); + throw new GFacHandlerException("Error retrieving job details from Registry", e); + } + JobDetails jobDetails = taskData.getJobDetailsList().get(0); + String jobDescription = jobDetails.getJobDescription(); + if (jobDescription != null) { + JobDescriptor jobDescriptor = null; + try { + jobDescriptor = JobDescriptor.fromXML(jobDescription); + } catch (XmlException e1) { + e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + applicationDeploymentDescription.getType().setScratchWorkingDirectory( + jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getWorkingDirectory()); + applicationDeploymentDescription.getType().setInputDataDirectory(jobDescriptor.getInputDirectory()); + applicationDeploymentDescription.getType().setOutputDataDirectory(jobDescriptor.getOutputDirectory()); + applicationDeploymentDescription.getType().setStandardError(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardErrorFile()); + applicationDeploymentDescription.getType().setStandardOutput(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardOutFile()); + } + } + } + if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) { + try { + GFACSSHUtils.addSecurityContext(jobExecutionContext); + } catch (ApplicationSettingsException e) { + log.error(e.getMessage()); + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } + } + super.invoke(jobExecutionContext); + DataTransferDetails detail = new DataTransferDetails(); + TransferStatus status = new TransferStatus(); + + ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext() + .getApplicationDeploymentDescription().getType(); + try { + Cluster cluster = null; + if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) { + cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster(); + } else { + cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster(); + } + if (cluster == null) { + throw new GFacProviderException("Security context is not set properly"); + } else { + log.info("Successfully retrieved the Security Context"); + } + + // Get the Stdouts and StdErrs + String timeStampedServiceName = GFacUtils.createUniqueNameForService(jobExecutionContext.getServiceName()); + + TaskDetails taskData = jobExecutionContext.getTaskData(); + String outputDataDir = null; + File localStdOutFile; + File localStdErrFile; + + if (taskData.getAdvancedOutputDataHandling() != null) { + outputDataDir = taskData.getAdvancedOutputDataHandling().getOutputDataDir(); + } + if (outputDataDir == null) { + outputDataDir = File.separator + "tmp"; + } + outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID(); + (new File(outputDataDir)).mkdirs(); + + + localStdOutFile = new File(outputDataDir + File.separator + timeStampedServiceName + "stdout"); + localStdErrFile = new File(outputDataDir + File.separator + timeStampedServiceName + "stderr"); +// cluster.makeDirectory(outputDataDir); + cluster.scpFrom(app.getStandardOutput(), localStdOutFile.getAbsolutePath()); + Thread.sleep(1000); + cluster.scpFrom(app.getStandardError(), localStdErrFile.getAbsolutePath()); + Thread.sleep(1000); + + String stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath()); + String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath()); + status.setTransferState(TransferState.COMPLETE); + detail.setTransferStatus(status); + detail.setTransferDescription("STDOUT:" + stdOutStr); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + + status.setTransferState(TransferState.COMPLETE); + detail.setTransferStatus(status); + detail.setTransferDescription("STDERR:" + stdErrStr); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + + + Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>(); + Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters(); + Set<String> keys = output.keySet(); + for (String paramName : keys) { + ActualParameter actualParameter = (ActualParameter) output.get(paramName); + if ("URI".equals(actualParameter.getType().getType().toString())) { + + List<String> outputList = cluster.listDirectory(app.getOutputDataDirectory()); + if (outputList.size() == 0 || outputList.get(0).isEmpty()) { + stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr); + } else { + String valueList = outputList.get(0); + cluster.scpFrom(app.getOutputDataDirectory() + File.separator + valueList, outputDataDir); + jobExecutionContext.addOutputFile(outputDataDir + File.separator + valueList); + ((URIParameterType) actualParameter.getType()).setValue(valueList); + stringMap = new HashMap<String, ActualParameter>(); + stringMap.put(paramName, actualParameter); + } + } else { + stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr); + } + } + if (stringMap == null || stringMap.isEmpty()) { + throw new GFacHandlerException( + "Empty Output returned from the Application, Double check the application" + + "and ApplicationDescriptor output Parameter Names"); + } + status.setTransferState(TransferState.DOWNLOAD); + detail.setTransferStatus(status); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + + app.setStandardError(localStdErrFile.getAbsolutePath()); + app.setStandardOutput(localStdOutFile.getAbsolutePath()); + app.setOutputDataDirectory(outputDataDir); + } catch (XmlException e) { + throw new GFacHandlerException("Cannot read output:" + e.getMessage(), e); + } catch (ConnectionException e) { + throw new GFacHandlerException(e.getMessage(), e); + } catch (TransportException e) { + throw new GFacHandlerException(e.getMessage(), e); + } catch (IOException e) { + throw new GFacHandlerException(e.getMessage(), e); + } catch (Exception e) { + try { + status.setTransferState(TransferState.FAILED); + detail.setTransferStatus(status); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID()); + } catch (Exception e1) { + throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Error in retrieving results", e); + } + + } + + public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException { + + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/gfac/gfac-ssh/src/test/resources/gfac-config.xml ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/test/resources/gfac-config.xml b/modules/gfac/gfac-ssh/src/test/resources/gfac-config.xml index f21288d..f3881ad 100644 --- a/modules/gfac/gfac-ssh/src/test/resources/gfac-config.xml +++ b/modules/gfac/gfac-ssh/src/test/resources/gfac-config.xml @@ -23,11 +23,11 @@ <Provider class="org.apache.airavata.gfac.provider.impl.SSHProvider" host="org.apache.airavata.schemas.gfac.impl.SSHHostTypeImpl"> <InHandlers> - <Handler class="org.apache.airavata.gfac.handler.SCPDirectorySetupHandler"/> - <Handler class="org.apache.airavata.gfac.handler.SCPInputHandler"/> + <Handler class="org.apache.airavata.gfac.handler.SSHDirectorySetupHandler"/> + <Handler class="org.apache.airavata.gfac.handler.GSISSHInputHandler"/> </InHandlers> <OutHandlers> - <Handler class="org.apache.airavata.gfac.handler.SCPOutputHandler"/> + <Handler class="org.apache.airavata.gfac.handler.GSISSHOutputHandler"/> </OutHandlers> </Provider> </GFac> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java index ca55169..6e5ff2f 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java @@ -122,7 +122,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator implements Abst Constants.PUSH.equals(((GsisshHostType) hostDescription).getMonitorMode())) { MonitorID monitorID = new MonitorID(hostDescription, null, taskId, workflowNodeId, experimentId, userName); monitorManager.addAJobToMonitor(monitorID); - JobExecutionContext jobExecutionContext = jobSubmitter.submit(experimentId, taskId); + jobSubmitter.submit(experimentId, taskId); // even this get returns we cannot use this because subscription has to be done early if ("none".equals(jobID)) { logger.error("Job submission Failed, so we remove the job from monitoring"); @@ -134,6 +134,9 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator implements Abst // if the monitoring is pull mode then we add the monitorID for each task after submitting // the job with the jobID, otherwise we don't need the jobID JobExecutionContext jobExecutionContext = jobSubmitter.submit(experimentId, taskId); + jobExecutionContext.setTaskData(task); + jobID = jobExecutionContext.getJobDetails().getJobID(); + logger.info("Job Launched to the resource by GFAC and jobID returned : " + jobID); MonitorID monitorID = new MonitorID(hostDescription, jobID, taskId, workflowNodeId, experimentId, userName, authenticationInfo); monitorID.setJobExecutionContext(jobExecutionContext);
