Repository: airavata Updated Branches: refs/heads/master 0a46c0cba -> eeb00a39c
https://issues.apache.org/jira/browse/AIRAVATA-1078 Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/1f7a8d94 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/1f7a8d94 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/1f7a8d94 Branch: refs/heads/master Commit: 1f7a8d946e580d5cdc79833ed38d647db9661ac5 Parents: f558dee Author: lahiru <[email protected]> Authored: Wed Mar 12 23:43:07 2014 -0400 Committer: lahiru <[email protected]> Committed: Wed Mar 12 23:43:07 2014 -0400 ---------------------------------------------------------------------- .../src/main/resources/conf/gfac-config.xml | 32 ++-- .../gfac/handler/AdvancedSCPInputHandler.java | 149 +++++++++++++++++++ .../gfac/handler/AdvancedSCPOutputHandler.java | 110 ++++++++++++++ .../airavata/gfac/handler/SCPOutputHandler.java | 14 +- .../gsi/ssh/api/job/SlurmOutputParser.java | 2 +- 5 files changed, 294 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/1f7a8d94/modules/distribution/airavata-server/src/main/resources/conf/gfac-config.xml ---------------------------------------------------------------------- diff --git a/modules/distribution/airavata-server/src/main/resources/conf/gfac-config.xml b/modules/distribution/airavata-server/src/main/resources/conf/gfac-config.xml index f03ecde..b0a9bce 100644 --- a/modules/distribution/airavata-server/src/main/resources/conf/gfac-config.xml +++ b/modules/distribution/airavata-server/src/main/resources/conf/gfac-config.xml @@ -71,13 +71,27 @@ <Handler class="org.apache.airavata.gfac.handler.SCPOutputHandler"/> </OutHandlers> </Provider> - <Provider class="org.apache.airavata.gfac.provider.impl.GSISSHProvider" host="org.apache.airavata.schemas.gfac.impl.GsisshHostTypeImpl"> - <InHandlers> - <Handler class="org.apache.airavata.gfac.handler.SCPDirectorySetupHandler"/> - <Handler class="org.apache.airavata.gfac.handler.SCPInputHandler"/> - </InHandlers> - <OutHandlers> - <Handler class="org.apache.airavata.gfac.handler.SCPOutputHandler"/> - </OutHandlers> - </Provider> + <Provider class="org.apache.airavata.gfac.provider.impl.GSISSHProvider" host="org.apache.airavata.schemas.gfac.impl.GsisshHostTypeImpl"> + <InHandlers> + <Handler class="org.apache.airavata.gfac.handler.SCPDirectorySetupHandler"/> + <!--Handler class="org.apache.airavata.gfac.handler.AdvancedSCPInputHandler"> + <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/> + <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/> + <property name="userName" value="root"/> + <property name="hostName" value="gw98.iu.xsede.org"/> + <property name="outputPath" value="/tmp"/> + </Handler--> + <Handler class="org.apache.airavata.gfac.handler.SCPInputHandler"/> + </InHandlers> + <OutHandlers> + <Handler class="org.apache.airavata.gfac.handler.SCPOutputHandler"/> + <!--Handler class="org.apache.airavata.gfac.handler.AdvancedSCPOutputHandler"> + <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/> + <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/> + <property name="userName" value="root"/> + <property name="hostName" value="gw111.iu.xsede.org"/> + <property name="outputPath" value="/tmp"/> + </Handler--> + </OutHandlers> + </Provider> </GFac> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/1f7a8d94/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java new file mode 100644 index 0000000..92b4ee6 --- /dev/null +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java @@ -0,0 +1,149 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.handler; + +import com.sun.tools.javac.util.Paths; +import org.apache.airavata.common.utils.StringUtil; +import org.apache.airavata.commons.gfac.type.ActualParameter; +import org.apache.airavata.commons.gfac.type.MappingFactory; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.context.JobExecutionContext; +import org.apache.airavata.gfac.context.MessageContext; +import org.apache.airavata.gfac.utils.GFacUtils; +import org.apache.airavata.gsi.ssh.api.Cluster; +import org.apache.airavata.gsi.ssh.api.SSHApiException; +import org.apache.airavata.gsi.ssh.api.ServerInfo; +import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; +import org.apache.airavata.gsi.ssh.impl.PBSCluster; +import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo; +import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication; +import org.apache.airavata.gsi.ssh.util.CommonUtils; +import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType; +import org.apache.airavata.schemas.gfac.URIArrayType; +import org.apache.airavata.schemas.gfac.URIParameterType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.persistence.criteria.Path; +import java.io.File; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; +import java.util.*; + +/** + * This handler will copy input data from gateway machine to airavata + * installed machine, later running handlers can copy the input files to computing resource + * <Handler class="org.apache.airavata.gfac.handler.AdvancedSCPOutputHandler"> + <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/> + <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/> + <property name="userName" value="airavata"/> + <property name="hostName" value="gw98.iu.xsede.org"/> + <property name="outputPath" value="/home/airavata/outputData"/> + */ +public class AdvancedSCPInputHandler { + private static final Logger log = LoggerFactory.getLogger(AdvancedSCPInputHandler.class); + + private String password = null; + + private String publicKeyPath; + + private String passPhrase; + + private String privateKeyPath; + + private String userName; + + private String hostName; + + private String outputPath; + + public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException { + password = properties.get("password"); + passPhrase = properties.get("passPhrase"); + privateKeyPath = properties.get("privateKeyPath"); + publicKeyPath = properties.get("publicKeyPath"); + userName = properties.get("userName"); + hostName = properties.get("hostName"); + outputPath = properties.get("outputPath"); + } + + public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException { + ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext() + .getApplicationDeploymentDescription().getType(); + String standardError = app.getStandardError(); + String standardOutput = app.getStandardOutput(); + String outputDataDirectory = app.getOutputDataDirectory(); + + AuthenticationInfo authenticationInfo = null; + if (password != null) { + authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password); + } else { + authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath, + this.passPhrase); + } + // Server info + ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName); + Cluster pbsCluster = null; + MessageContext inputNew = new MessageContext(); + try { + pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/")); + String parentPath = outputPath + File.separator + jobExecutionContext.getExperimentID() + File.separator + jobExecutionContext.getTaskData().getTaskID(); + pbsCluster.makeDirectory(parentPath); + MessageContext input = jobExecutionContext.getInMessageContext(); + Set<String> parameters = input.getParameters().keySet(); + for (String paramName : parameters) { + ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName); + String paramValue = MappingFactory.toString(actualParameter); + //TODO: Review this with type + if ("URI".equals(actualParameter.getType().getType().toString())) { + ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(pbsCluster, paramValue, parentPath)); + } else if ("URIArray".equals(actualParameter.getType().getType().toString())) { + List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue)); + List<String> newFiles = new ArrayList<String>(); + for (String paramValueEach : split) { + String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath); + newFiles.add(stageInputFiles); + } + ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()])); + } + inputNew.getParameters().put(paramName, actualParameter); + } + } catch (Exception e) { + log.error(e.getMessage()); + throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage()); + } + jobExecutionContext.setInMessageContext(inputNew); + } + + private String stageInputFiles(Cluster cluster, String paramValue, String parentPath) throws GFacException { + try { + cluster.scpFrom(paramValue, parentPath); + return "file://" + parentPath + File.separator + URI.create(paramValue).toURL().getFile(); + } catch (SSHApiException e) { + log.error("Error tranfering remote file to local file, remote path: " + paramValue); + throw new GFacException(e); + } catch (MalformedURLException e) { + log.error("Error processing input URL"); + throw new GFacException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/1f7a8d94/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java new file mode 100644 index 0000000..06496a1 --- /dev/null +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.handler; + +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.context.JobExecutionContext; +import org.apache.airavata.gfac.context.security.GSISecurityContext; +import org.apache.airavata.gfac.context.security.SSHSecurityContext; +import org.apache.airavata.gfac.provider.GFacProviderException; +import org.apache.airavata.gsi.ssh.api.Cluster; +import org.apache.airavata.gsi.ssh.api.SSHApiException; +import org.apache.airavata.gsi.ssh.api.ServerInfo; +import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; +import org.apache.airavata.gsi.ssh.impl.PBSCluster; +import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo; +import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication; +import org.apache.airavata.gsi.ssh.util.CommonUtils; +import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Map; + +/** + * This handler will copy outputs from airavata installed local directory + * to a remote location, prior to this handler SCPOutputHandler should be invoked + * Should add following configuration to gfac-config.xml and configure the keys properly + * <Handler class="org.apache.airavata.gfac.handler.AdvancedSCPOutputHandler"> + <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/> + <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/> + <property name="userName" value="airavata"/> + <property name="hostName" value="gw98.iu.xsede.org"/> + <property name="outputPath" value="/home/airavata/outputData"/> + */ +public class AdvancedSCPOutputHandler extends AbstractHandler { + private static final Logger log = LoggerFactory.getLogger(AdvancedSCPOutputHandler.class); + + private String password = null; + + private String publicKeyPath; + + private String passPhrase; + + private String privateKeyPath; + + private String userName; + + private String hostName; + + private String outputPath; + + public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException { + password = properties.get("password"); + passPhrase = properties.get("passPhrase"); + privateKeyPath = properties.get("privateKeyPath"); + publicKeyPath = properties.get("publicKeyPath"); + userName = properties.get("userName"); + hostName = properties.get("hostName"); + outputPath = properties.get("outputPath"); + } + + @Override + public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException { + ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext() + .getApplicationDeploymentDescription().getType(); + String standardError = app.getStandardError(); + String standardOutput = app.getStandardOutput(); + String outputDataDirectory = app.getOutputDataDirectory(); + + AuthenticationInfo authenticationInfo = null; + if (password != null) { + authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password); + } else { + authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath, + this.passPhrase); + } + // Server info + ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName); + try { + Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/")); + String parentPath = outputPath + File.separator + jobExecutionContext.getExperimentID() + File.separator + jobExecutionContext.getTaskData().getTaskID(); + pbsCluster.makeDirectory(parentPath); + pbsCluster.scpTo(parentPath, standardError); + pbsCluster.scpTo(parentPath, standardOutput); + } catch (SSHApiException e) { + log.error("Error transfering files to remote host : " + hostName + " with the user: " + userName); + log.error(e.getMessage()); + throw new GFacException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/1f7a8d94/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java index 07b869c..2a1a49a 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java @@ -37,6 +37,7 @@ import org.apache.airavata.gfac.provider.GFacProviderException; import org.apache.airavata.gfac.utils.GFacUtils; import org.apache.airavata.gfac.utils.OutputUtils; import org.apache.airavata.gsi.ssh.api.Cluster; +import org.apache.airavata.gsi.ssh.util.SSHUtils; import org.apache.airavata.model.workspace.experiment.*; import org.apache.airavata.persistance.registry.jpa.model.DataTransferDetail; import org.apache.airavata.registry.cpi.ChildDataType; @@ -79,8 +80,10 @@ public class SCPOutputHandler extends AbstractHandler{ if (taskData.getAdvancedOutputDataHandling() != null) { outputDataDir = taskData.getAdvancedOutputDataHandling().getOutputDataDir(); + AdvancedOutputDataHandling advancedOutputDataHandling = taskData.getAdvancedOutputDataHandling(); } if (outputDataDir != null) { + app.setOutputDataDirectory(outputDataDir); // These will be useful if we are doing third party transfer localStdOutFile = new File(outputDataDir + File.separator + timeStampedServiceName + "stdout"); localStdErrFile = new File(outputDataDir + File.separator + timeStampedServiceName + "stderr"); } else { @@ -98,12 +101,12 @@ public class SCPOutputHandler extends AbstractHandler{ detail.setTransferStatus(status); detail.setTransferDescription("STDOUT:" + stdOutStr); registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID()); - + status.setTransferState(TransferState.COMPLETE); detail.setTransferStatus(status); detail.setTransferDescription("STDERR:" + stdErrStr); registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID()); - + Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>(); Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters(); @@ -116,7 +119,12 @@ public class SCPOutputHandler extends AbstractHandler{ status.setTransferState(TransferState.DOWNLOAD); detail.setTransferStatus(status); registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID()); - + + app.setStandardError(localStdErrFile.getAbsolutePath()); + app.setStandardOutput(localStdOutFile.getAbsolutePath()); + if (outputDataDir != null) { + app.setOutputDataDirectory(outputDataDir); + } } catch (XmlException e) { throw new GFacHandlerException("Cannot read output:" + e.getMessage(), e); } catch (ConnectionException e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/1f7a8d94/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java ---------------------------------------------------------------------- diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java index 3b9d2c3..d118ee5 100644 --- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java +++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java @@ -37,7 +37,7 @@ public class SlurmOutputParser implements OutputParser { String lastString = info[info.length -1]; if (lastString.contains("JOB ID")) { // because there's no state - descriptor.setStatus("E"); + descriptor.setStatus("U"); }else{ int column = 0; System.out.println(lastString);
