http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java new file mode 100644 index 0000000..e53fe09 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java @@ -0,0 +1,280 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.util; + +import org.apache.airavata.common.logger.AiravataLogger; +import org.apache.airavata.common.logger.AiravataLoggerFactory; +import org.apache.airavata.common.utils.Constants; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.handler.GFacHandler; +import org.apache.airavata.gfac.core.handler.GFacHandlerConfig; +import org.apache.airavata.gfac.core.monitor.MonitorID; +import org.apache.airavata.gfac.monitor.HostMonitorData; +import org.apache.airavata.gfac.monitor.UserMonitorData; +import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; +import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + +public class CommonUtils { + private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(CommonUtils.class); + + public static String getChannelID(MonitorID monitorID) { + return monitorID.getUserName() + "-" + monitorID.getComputeResourceDescription().getHostName(); + } + + public static String getRoutingKey(MonitorID monitorID) { + return "*." + monitorID.getUserName() + "." + monitorID.getComputeResourceDescription().getIpAddresses().get(0); + } + + public static String getChannelID(String userName,String hostAddress) { + return userName + "-" + hostAddress; + } + + public static String getRoutingKey(String userName,String hostAddress) { + return "*." + userName + "." + hostAddress; + } + + public static void addMonitortoQueue(BlockingQueue<UserMonitorData> queue, MonitorID monitorID, JobExecutionContext jobExecutionContext) throws AiravataMonitorException { + synchronized (queue) { + Iterator<UserMonitorData> iterator = queue.iterator(); + while (iterator.hasNext()) { + UserMonitorData next = iterator.next(); + if (next.getUserName().equals(monitorID.getUserName())) { + // then this is the right place to update + List<HostMonitorData> monitorIDs = next.getHostMonitorData(); + for (HostMonitorData host : monitorIDs) { + if (isEqual(host.getComputeResourceDescription(), monitorID.getComputeResourceDescription())) { + // ok we found right place to add this monitorID + host.addMonitorIDForHost(monitorID); + logger.debugId(monitorID.getJobID(), "Added new job to the monitoring queue, experiment {}," + + " task {}", monitorID.getExperimentID(), monitorID.getTaskID()); + return; + } + } + // there is a userMonitor object for this user name but no Hosts for this host + // so we have to create new Hosts + HostMonitorData hostMonitorData = new HostMonitorData(jobExecutionContext); + hostMonitorData.addMonitorIDForHost(monitorID); + next.addHostMonitorData(hostMonitorData); + logger.debugId(monitorID.getJobID(), "Added new job to the monitoring queue, experiment {}," + + " task {}", monitorID.getExperimentID(), monitorID.getTaskID()); + return; + } + } + HostMonitorData hostMonitorData = new HostMonitorData(jobExecutionContext); + hostMonitorData.addMonitorIDForHost(monitorID); + + UserMonitorData userMonitorData = new UserMonitorData(monitorID.getUserName()); + userMonitorData.addHostMonitorData(hostMonitorData); + try { + queue.put(userMonitorData); + logger.debugId(monitorID.getJobID(), "Added new job to the monitoring queue, experiment {}," + + " task {}", monitorID.getExperimentID(), monitorID.getTaskID()); + } catch (InterruptedException e) { + throw new AiravataMonitorException(e); + } + } + } + + private static boolean isEqual(ComputeResourceDescription comRes_1, ComputeResourceDescription comRes_2) { + return comRes_1.getComputeResourceId().equals(comRes_2.getComputeResourceId()) && + comRes_1.getHostName().equals(comRes_2.getHostName()); + } + + public static boolean isTheLastJobInQueue(BlockingQueue<MonitorID> queue,MonitorID monitorID){ + Iterator<MonitorID> iterator = queue.iterator(); + while(iterator.hasNext()){ + MonitorID next = iterator.next(); + if (monitorID.getUserName().equals(next.getUserName()) && + CommonUtils.isEqual(monitorID.getComputeResourceDescription(), next.getComputeResourceDescription())) { + return false; + } + } + return true; + } + + /** + * This method doesn't have to be synchronized because it will be invoked by HPCPullMonitor which already synchronized + * @param monitorID + * @throws AiravataMonitorException + */ + public static void removeMonitorFromQueue(UserMonitorData userMonitorData, MonitorID monitorID) throws AiravataMonitorException { + if (userMonitorData.getUserName().equals(monitorID.getUserName())) { + // then this is the right place to update + List<HostMonitorData> hostMonitorData = userMonitorData.getHostMonitorData(); + Iterator<HostMonitorData> iterator1 = hostMonitorData.iterator(); + while (iterator1.hasNext()) { + HostMonitorData iHostMonitorID = iterator1.next(); + if (isEqual(iHostMonitorID.getComputeResourceDescription(), monitorID.getComputeResourceDescription())) { + Iterator<MonitorID> iterator2 = iHostMonitorID.getMonitorIDs().iterator(); + while (iterator2.hasNext()) { + MonitorID iMonitorID = iterator2.next(); + if (iMonitorID.getJobID().equals(monitorID.getJobID()) + || iMonitorID.getJobName().equals(monitorID.getJobName())) { + // OK we found the object, we cannot do list.remove(object) states of two objects + // could be different, thats why we check the jobID + iterator2.remove(); + logger.infoId(monitorID.getJobID(), "Removed the jobId: {} JobName: {} from monitoring last " + + "status:{}", monitorID.getJobID(),monitorID.getJobName(), monitorID.getStatus().toString()); + + return; + } + } + } + } + } + logger.info("Cannot find the given MonitorID in the queue with userName " + + monitorID.getUserName() + " and jobID " + monitorID.getJobID()); + logger.info("This might not be an error because someone else removed this job from the queue"); + } + + + public static void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException { + List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers(); + + for (GFacHandlerConfig handlerClassName : handlers) { + Class<? extends GFacHandler> handlerClass; + GFacHandler handler; + try { + handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class); + handler = handlerClass.newInstance(); + handler.initProperties(handlerClassName.getProperties()); + } catch (ClassNotFoundException e) { + logger.error(e.getMessage()); + throw new GFacException("Cannot load handler class " + handlerClassName, e); + } catch (InstantiationException e) { + logger.error(e.getMessage()); + throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); + } catch (IllegalAccessException e) { + logger.error(e.getMessage()); + throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); + } + try { + handler.invoke(jobExecutionContext); + } catch (Exception e) { + // TODO: Better error reporting. + throw new GFacException("Error Executing a OutFlow Handler", e); + } + } + } + + /** + * Update job count for a given set of paths. + * @param curatorClient - CuratorFramework instance + * @param changeCountMap - map of change job count with relevant path + * @param isAdd - Should add or reduce existing job count by the given job count. + */ + public static void updateZkWithJobCount(CuratorFramework curatorClient, final Map<String, Integer> changeCountMap, boolean isAdd) { + StringBuilder changeZNodePaths = new StringBuilder(); + try { + for (String path : changeCountMap.keySet()) { + if (isAdd) { + CommonUtils.checkAndCreateZNode(curatorClient, path); + } + byte[] byteData = curatorClient.getData().forPath(path); + String nodeData; + if (byteData == null) { + if (isAdd) { + curatorClient.setData().withVersion(-1).forPath(path, String.valueOf(changeCountMap.get(path)).getBytes()); + } else { + // This is not possible, but we handle in case there any data zookeeper communication failure + logger.warn("Couldn't reduce job count in " + path + " as it returns null data. Hence reset the job count to 0"); + curatorClient.setData().withVersion(-1).forPath(path, "0".getBytes()); + } + } else { + nodeData = new String(byteData); + if (isAdd) { + curatorClient.setData().withVersion(-1).forPath(path, + String.valueOf(changeCountMap.get(path) + Integer.parseInt(nodeData)).getBytes()); + } else { + int previousCount = Integer.parseInt(nodeData); + int removeCount = changeCountMap.get(path); + if (previousCount >= removeCount) { + curatorClient.setData().withVersion(-1).forPath(path, + String.valueOf(previousCount - removeCount).getBytes()); + } else { + // This is not possible, do we need to reset the job count to 0 ? + logger.error("Requested remove job count is " + removeCount + + " which is higher than the existing job count " + previousCount + + " in " + path + " path."); + } + } + } + changeZNodePaths.append(path).append(":"); + } + + // update stat node to trigger orchestrator watchers + if (changeCountMap.size() > 0) { + changeZNodePaths.deleteCharAt(changeZNodePaths.length() - 1); + curatorClient.setData().withVersion(-1).forPath("/" + Constants.STAT, changeZNodePaths.toString().getBytes()); + } + } catch (Exception e) { + logger.error("Error while writing job count to zookeeper", e); + } + + } + + /** + * Increase job count by one and update the zookeeper + * @param monitorID - Job monitorId + */ + public static void increaseZkJobCount(MonitorID monitorID) { + Map<String, Integer> addMap = new HashMap<String, Integer>(); + addMap.put(CommonUtils.getJobCountUpdatePath(monitorID), 1); + updateZkWithJobCount(monitorID.getJobExecutionContext().getCuratorClient(), addMap, true); + } + + /** + * Construct and return the path for a given MonitorID , eg: /stat/{username}/{resourceName}/job + * @param monitorID - Job monitorId + * @return + */ + public static String getJobCountUpdatePath(MonitorID monitorID){ + return new StringBuilder("/").append(Constants.STAT).append("/").append(monitorID.getUserName()) + .append("/").append(monitorID.getComputeResourceDescription().getHostName()).append("/").append(Constants.JOB).toString(); + } + + /** + * Check whether znode is exist in given path if not create a new znode + * @param curatorClient - zookeeper instance + * @param path - path to check znode + * @throws KeeperException + * @throws InterruptedException + */ + private static void checkAndCreateZNode(CuratorFramework curatorClient , String path) throws Exception { + if (curatorClient.checkExists().forPath(path) == null) { // if znode doesn't exist + if (path.lastIndexOf("/") > 1) { // recursively traverse to parent znode and check parent exist + checkAndCreateZNode(curatorClient, (path.substring(0, path.lastIndexOf("/")))); + } + curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path); + } + } +}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java new file mode 100644 index 0000000..08c3f67 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java @@ -0,0 +1,164 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.util; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.bouncycastle.jce.provider.BouncyCastleProvider; + + +import java.io.*; +import java.security.*; +import java.security.cert.CertificateException; +import java.security.cert.CertificateFactory; +import java.security.cert.CertificateParsingException; +import java.security.cert.X509Certificate; +import java.security.spec.InvalidKeySpecException; + +public class X509Helper { + + static { + // parsing of RSA key fails without this + java.security.Security.addProvider(new BouncyCastleProvider()); + } + + + + public static KeyStore keyStoreFromPEM(String proxyFile, + String keyPassPhrase) throws IOException, + CertificateException, + NoSuchAlgorithmException, + InvalidKeySpecException, + KeyStoreException { + return keyStoreFromPEM(proxyFile,proxyFile,keyPassPhrase); + } + + public static KeyStore keyStoreFromPEM(String certFile, + String keyFile, + String keyPassPhrase) throws IOException, + CertificateException, + NoSuchAlgorithmException, + InvalidKeySpecException, + KeyStoreException { + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + X509Certificate cert = (X509Certificate)cf.generateCertificate(new FileInputStream(certFile)); + //System.out.println(cert.toString()); + + // this works for proxy files, too, since it skips over the certificate + BufferedReader reader = new BufferedReader(new FileReader(keyFile)); + String line = null; + StringBuilder builder = new StringBuilder(); + boolean inKey = false; + while((line=reader.readLine()) != null) { + if (line.contains("-----BEGIN RSA PRIVATE KEY-----")) { + inKey = true; + } + if (inKey) { + builder.append(line); + builder.append(System.getProperty("line.separator")); + } + if (line.contains("-----END RSA PRIVATE KEY-----")) { + inKey = false; + } + } + String privKeyPEM = builder.toString(); + //System.out.println(privKeyPEM); + + // using BouncyCastle +// PEMReader pemParser = new PEMReader(new StringReader(privKeyPEM)); +// Object object = pemParser.readObject(); +// +// PrivateKey privKey = null; +// if(object instanceof KeyPair){ +// privKey = ((KeyPair)object).getPrivate(); +// } + // PEMParser from BouncyCastle is good for reading PEM files, but I didn't want to add that dependency + /* + // Base64 decode the data + byte[] encoded = javax.xml.bind.DatatypeConverter.parseBase64Binary(privKeyPEM); + + // PKCS8 decode the encoded RSA private key + java.security.spec.PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(encoded); + KeyFactory kf = KeyFactory.getInstance("RSA"); + PrivateKey privKey = kf.generatePrivate(keySpec); + //RSAPrivateKey privKey = (RSAPrivateKey)kf.generatePrivate(keySpec); + */ + //System.out.println(privKey.toString()); + +// KeyStore keyStore = KeyStore.getInstance("PKCS12"); +// keyStore.load(null,null); +// +// KeyStore.PrivateKeyEntry entry = +// new KeyStore.PrivateKeyEntry(privKey, +// new java.security.cert.Certificate[] {(java.security.cert.Certificate)cert}); +// KeyStore.PasswordProtection prot = new KeyStore.PasswordProtection(keyPassPhrase.toCharArray()); +// keyStore.setEntry(cert.getSubjectX500Principal().getName(), entry, prot); + +// return keyStore; + //TODO: Problem with BouncyCastle version used in gsissh + throw new CertificateException("Method not implemented"); + + } + + + public static KeyStore trustKeyStoreFromCertDir() throws IOException, + KeyStoreException, + CertificateException, + NoSuchAlgorithmException, ApplicationSettingsException { + return trustKeyStoreFromCertDir(ServerSettings.getSetting("trusted.cert.location")); + } + + public static KeyStore trustKeyStoreFromCertDir(String certDir) throws IOException, + KeyStoreException, + CertificateException, + NoSuchAlgorithmException { + KeyStore ks = KeyStore.getInstance("JKS"); + ks.load(null,null); + + File dir = new File(certDir); + for(File file : dir.listFiles()) { + if (!file.isFile()) { + continue; + } + if (!file.getName().endsWith(".0")) { + continue; + } + + try { + //System.out.println("reading file "+file.getName()); + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + X509Certificate cert = (X509Certificate) cf.generateCertificate(new FileInputStream(file)); + //System.out.println(cert.toString()); + + KeyStore.TrustedCertificateEntry entry = new KeyStore.TrustedCertificateEntry(cert); + + ks.setEntry(cert.getSubjectX500Principal().getName(), entry, null); + } catch (KeyStoreException e) { + } catch (CertificateParsingException e) { + continue; + } + + } + + return ks; + } +} + http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java new file mode 100644 index 0000000..74642dc --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.ssh.context; + +import org.apache.airavata.gfac.ssh.api.ServerInfo; +import org.apache.airavata.gfac.core.authentication.AuthenticationInfo; + +public class SSHAuthWrapper { + private ServerInfo serverInfo; + + private AuthenticationInfo authenticationInfo; + + private String key; + + public SSHAuthWrapper(ServerInfo serverInfo, AuthenticationInfo authenticationInfo, String key) { + this.serverInfo = serverInfo; + this.authenticationInfo = authenticationInfo; + this.key = key; + } + + public ServerInfo getServerInfo() { + return serverInfo; + } + + public AuthenticationInfo getAuthenticationInfo() { + return authenticationInfo; + } + + public String getKey() { + return key; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java new file mode 100644 index 0000000..9481188 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java @@ -0,0 +1,229 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.ssh.handler; + +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.context.MessageContext; +import org.apache.airavata.gfac.core.handler.AbstractHandler; +import org.apache.airavata.gfac.core.handler.GFacHandlerException; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; +import org.apache.airavata.gfac.ssh.util.GFACSSHUtils; +import org.apache.airavata.gfac.ssh.api.Cluster; +import org.apache.airavata.gfac.ssh.api.SSHApiException; +import org.apache.airavata.gfac.core.authentication.AuthenticationInfo; +import org.apache.airavata.gfac.ssh.impl.authentication.DefaultPasswordAuthenticationInfo; +import org.apache.airavata.gfac.ssh.impl.authentication.DefaultPublicKeyFileAuthentication; +import org.apache.airavata.model.appcatalog.appinterface.DataType; +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.model.workspace.experiment.*; +import org.apache.airavata.registry.cpi.ChildDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.*; + +/** + * This handler will copy input data from gateway machine to airavata + * installed machine, later running handlers can copy the input files to computing resource + * <Handler class="AdvancedSCPOutputHandler"> + * <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/> + * <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/> + * <property name="userName" value="airavata"/> + * <property name="hostName" value="gw98.iu.xsede.org"/> + * <property name="inputPath" value="/home/airavata/outputData"/> + */ +public class AdvancedSCPInputHandler extends AbstractHandler { + private static final Logger log = LoggerFactory.getLogger(AdvancedSCPInputHandler.class); + public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth"; + public static final int DEFAULT_SSH_PORT = 22; + + private String password = null; + + private String publicKeyPath; + + private String passPhrase; + + private String privateKeyPath; + + private String userName; + + private String hostName; + + private String inputPath; + + public void initProperties(Properties properties) throws GFacHandlerException { + password = (String) properties.get("password"); + passPhrase = (String) properties.get("passPhrase"); + privateKeyPath = (String) properties.get("privateKeyPath"); + publicKeyPath = (String) properties.get("publicKeyPath"); + userName = (String) properties.get("userName"); + hostName = (String) properties.get("hostName"); + inputPath = (String) properties.get("inputPath"); + } + + public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + super.invoke(jobExecutionContext); + int index = 0; + int oldIndex = 0; + List<String> oldFiles = new ArrayList<String>(); + MessageContext inputNew = new MessageContext(); + StringBuffer data = new StringBuffer("|"); + Cluster pbsCluster = null; + + try { + String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName()); + if (pluginData != null) { + try { + oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim()); + oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(",")); + if (oldIndex == oldFiles.size()) { + log.info("Old data looks good !!!!"); + } else { + oldIndex = 0; + oldFiles.clear(); + } + } catch (NumberFormatException e) { + log.error("Previously stored data " + pluginData + " is wrong so we continue the operations"); + } + } + + AuthenticationInfo authenticationInfo = null; + if (password != null) { + authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password); + } else { + authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath, + this.passPhrase); + } + + // Server info + String parentPath = inputPath + File.separator + jobExecutionContext.getExperimentID() + File.separator + jobExecutionContext.getTaskData().getTaskID(); + if (index < oldIndex) { + parentPath = oldFiles.get(index); + data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index + } else { + (new File(parentPath)).mkdirs(); + StringBuffer temp = new StringBuffer(data.append(parentPath).append(",").toString()); + GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); + } + DataTransferDetails detail = new DataTransferDetails(); + TransferStatus status = new TransferStatus(); + // here doesn't matter what the job manager is because we are only doing some file handling + // not really dealing with monitoring or job submission, so we pa + + MessageContext input = jobExecutionContext.getInMessageContext(); + Set<String> parameters = input.getParameters().keySet(); + for (String paramName : parameters) { + InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName); + String paramValue = inputParamType.getValue(); + // TODO: Review this with type + if (inputParamType.getType() == DataType.URI) { + try { + URL file = new URL(paramValue); + String key = file.getUserInfo() + file.getHost() + DEFAULT_SSH_PORT; + GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, file.getUserInfo(), file.getHost(), DEFAULT_SSH_PORT); + pbsCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getPbsCluster(); + paramValue = file.getPath(); + } catch (MalformedURLException e) { + String key = this.userName + this.hostName + DEFAULT_SSH_PORT; + GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, this.userName, this.hostName, DEFAULT_SSH_PORT); + pbsCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getPbsCluster(); + log.error(e.getLocalizedMessage(), e); + } + + if (index < oldIndex) { + log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); + inputParamType.setValue(oldFiles.get(index)); + data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index + } else { + String stageInputFile = stageInputFiles(pbsCluster, paramValue, parentPath); + inputParamType.setValue(stageInputFile); + StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString()); + status.setTransferState(TransferState.UPLOAD); + detail.setTransferStatus(status); + detail.setTransferDescription("Input Data Staged: " + stageInputFile); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + + GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); + } + } + // FIXME: what is the thrift model DataType equivalent for URIArray type? +// else if ("URIArray".equals(actualParameter.getType().getType().toString())) { +// List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue)); +// List<String> newFiles = new ArrayList<String>(); +// for (String paramValueEach : split) { +// try { +// URL file = new URL(paramValue); +// this.userName = file.getUserInfo(); +// this.hostName = file.getHost(); +// paramValueEach = file.getPath(); +// } catch (MalformedURLException e) { +// log.error(e.getLocalizedMessage(), e); +// } +// if (index < oldIndex) { +// log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); +// newFiles.add(oldFiles.get(index)); +// data.append(oldFiles.get(index++)).append(","); +// } else { +// String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath); +// StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString()); +// GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); +// newFiles.add(stageInputFiles); +// } +// } +// ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()])); +// } + inputNew.getParameters().put(paramName, inputParamType); + } + } catch (Exception e) { + log.error(e.getMessage()); + try { + StringWriter errors = new StringWriter(); + e.printStackTrace(new PrintWriter(errors)); + GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + } catch (GFacException e1) { + log.error(e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage()); + } + jobExecutionContext.setInMessageContext(inputNew); + } + + public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + this.invoke(jobExecutionContext); + } + + private String stageInputFiles(Cluster cluster, String paramValue, String parentPath) throws GFacException { + try { + cluster.scpFrom(paramValue, parentPath); + return "file://" + parentPath + File.separator + (new File(paramValue)).getName(); + } catch (SSHApiException e) { + log.error("Error tranfering remote file to local file, remote path: " + paramValue); + throw new GFacException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java new file mode 100644 index 0000000..320f236 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java @@ -0,0 +1,225 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.ssh.handler; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.handler.AbstractHandler; +import org.apache.airavata.gfac.core.handler.GFacHandlerException; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; +import org.apache.airavata.gfac.ssh.util.GFACSSHUtils; +import org.apache.airavata.gfac.ssh.api.Cluster; +import org.apache.airavata.gfac.ssh.api.SSHApiException; +import org.apache.airavata.gfac.core.authentication.AuthenticationInfo; +import org.apache.airavata.gfac.ssh.impl.authentication.DefaultPasswordAuthenticationInfo; +import org.apache.airavata.gfac.ssh.impl.authentication.DefaultPublicKeyFileAuthentication; +import org.apache.airavata.model.appcatalog.appinterface.DataType; +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.model.workspace.experiment.CorrectiveAction; +import org.apache.airavata.model.workspace.experiment.ErrorCategory; +import org.apache.airavata.registry.cpi.ChildDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.*; + +/** + * This handler will copy outputs from airavata installed local directory + * to a remote location, prior to this handler SCPOutputHandler should be invoked + * Should add following configuration to gfac-config.xml and configure the keys properly + * <Handler class="AdvancedSCPOutputHandler"> + <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/> + <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/> + <property name="userName" value="airavata"/> + <property name="hostName" value="gw98.iu.xsede.org"/> + <property name="outputPath" value="/home/airavata/outputData"/> + <property name="passPhrase" value="/home/airavata/outputData"/> + <property name="password" value="/home/airavata/outputData"/> + + */ +public class AdvancedSCPOutputHandler extends AbstractHandler { + private static final Logger log = LoggerFactory.getLogger(AdvancedSCPOutputHandler.class); + + public static final int DEFAULT_SSH_PORT = 22; + + private String password = null; + + private String publicKeyPath; + + private String passPhrase; + + private String privateKeyPath; + + private String userName; + + private String hostName; + + private String outputPath; + + + public void initProperties(Properties properties) throws GFacHandlerException { + password = (String)properties.get("password"); + passPhrase = (String)properties.get("passPhrase"); + privateKeyPath = (String)properties.get("privateKeyPath"); + publicKeyPath = (String)properties.get("publicKeyPath"); + userName = (String)properties.get("userName"); + hostName = (String)properties.get("hostName"); + outputPath = (String)properties.get("outputPath"); + } + + @Override + public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + Cluster pbsCluster = null; + AuthenticationInfo authenticationInfo = null; + if (password != null) { + authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password); + } else { + authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath, + this.passPhrase); + } + try { + String hostName = jobExecutionContext.getHostName(); + if (jobExecutionContext.getSecurityContext(hostName) == null) { + try { + GFACSSHUtils.addSecurityContext(jobExecutionContext); + } catch (ApplicationSettingsException e) { + log.error(e.getMessage()); + try { + StringWriter errors = new StringWriter(); + e.printStackTrace(new PrintWriter(errors)); + GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + } catch (GFacException e1) { + log.error(e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } + } + String standardError = jobExecutionContext.getStandardError(); + String standardOutput = jobExecutionContext.getStandardOutput(); + super.invoke(jobExecutionContext); + // Server info + if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() != null && jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir() != null){ + try{ + URL outputPathURL = new URL(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir()); + this.userName = outputPathURL.getUserInfo(); + this.hostName = outputPathURL.getHost(); + outputPath = outputPathURL.getPath(); + } catch (MalformedURLException e) { + log.error(e.getLocalizedMessage(),e); + } + } + String key = GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, this.userName, this.hostName, DEFAULT_SSH_PORT); + pbsCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getPbsCluster(); + if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() != null && !jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().isPersistOutputData()){ + outputPath = outputPath + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID() + + File.separator; + pbsCluster.makeDirectory(outputPath); + } + pbsCluster.scpTo(outputPath, standardError); + pbsCluster.scpTo(outputPath, standardOutput); + List<OutputDataObjectType> outputArray = new ArrayList<OutputDataObjectType>(); + Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters(); + Set<String> keys = output.keySet(); + for (String paramName : keys) { + OutputDataObjectType outputDataObjectType = (OutputDataObjectType) output.get(paramName); + if (outputDataObjectType.getType() == DataType.URI) { + // for failed jobs outputs are not generated. So we should not download outputs + if (GFacUtils.isFailedJob(jobExecutionContext)){ + continue; + } + String downloadFile = outputDataObjectType.getValue(); + if(downloadFile == null || !(new File(downloadFile).isFile())){ + GFacUtils.saveErrorDetails(jobExecutionContext, "Empty Output returned from the application", CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + throw new GFacHandlerException("Empty Output returned from the application.." ); + } + pbsCluster.scpTo(outputPath, downloadFile); + String fileName = downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar)+1, downloadFile.length()); + OutputDataObjectType dataObjectType = new OutputDataObjectType(); + dataObjectType.setValue(outputPath + File.separatorChar + fileName); + dataObjectType.setName(paramName); + dataObjectType.setType(DataType.URI); + dataObjectType.setIsRequired(outputDataObjectType.isIsRequired()); + dataObjectType.setRequiredToAddedToCommandLine(outputDataObjectType.isRequiredToAddedToCommandLine()); + dataObjectType.setApplicationArgument(outputDataObjectType.getApplicationArgument()); + dataObjectType.setSearchQuery(outputDataObjectType.getSearchQuery()); + outputArray.add(dataObjectType); + }else if (outputDataObjectType.getType() == DataType.STDOUT) { + pbsCluster.scpTo(outputPath, standardOutput); + String fileName = standardOutput.substring(standardOutput.lastIndexOf(File.separatorChar)+1, standardOutput.length()); + OutputDataObjectType dataObjectType = new OutputDataObjectType(); + dataObjectType.setValue(outputPath + File.separatorChar + fileName); + dataObjectType.setName(paramName); + dataObjectType.setType(DataType.STDOUT); + dataObjectType.setIsRequired(outputDataObjectType.isIsRequired()); + dataObjectType.setRequiredToAddedToCommandLine(outputDataObjectType.isRequiredToAddedToCommandLine()); + dataObjectType.setApplicationArgument(outputDataObjectType.getApplicationArgument()); + dataObjectType.setSearchQuery(outputDataObjectType.getSearchQuery()); + outputArray.add(dataObjectType); + }else if (outputDataObjectType.getType() == DataType.STDERR) { + pbsCluster.scpTo(outputPath, standardError); + String fileName = standardError.substring(standardError.lastIndexOf(File.separatorChar)+1, standardError.length()); + OutputDataObjectType dataObjectType = new OutputDataObjectType(); + dataObjectType.setValue(outputPath + File.separatorChar + fileName); + dataObjectType.setName(paramName); + dataObjectType.setType(DataType.STDERR); + dataObjectType.setIsRequired(outputDataObjectType.isIsRequired()); + dataObjectType.setRequiredToAddedToCommandLine(outputDataObjectType.isRequiredToAddedToCommandLine()); + dataObjectType.setApplicationArgument(outputDataObjectType.getApplicationArgument()); + dataObjectType.setSearchQuery(outputDataObjectType.getSearchQuery()); + outputArray.add(dataObjectType); + } + } + registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID()); + } catch (SSHApiException e) { + try { + StringWriter errors = new StringWriter(); + e.printStackTrace(new PrintWriter(errors)); + GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + } catch (GFacException e1) { + log.error(e1.getLocalizedMessage()); + } + log.error("Error transfering files to remote host : " + hostName + " with the user: " + userName); + log.error(e.getMessage()); + throw new GFacHandlerException(e); + } catch (Exception e) { + try { + GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + } catch (GFacException e1) { + log.error(e1.getLocalizedMessage()); + } + throw new GFacHandlerException(e); + } + } + + @Override + public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + // TODO: Auto generated method body. + } + + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java new file mode 100644 index 0000000..61a1805 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java @@ -0,0 +1,78 @@ +package org.apache.airavata.gfac.ssh.handler; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.List; +import java.util.Properties; + +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.handler.AbstractHandler; +import org.apache.airavata.gfac.core.handler.GFacHandlerException; +import org.apache.airavata.gfac.core.provider.GFacProviderException; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; +import org.apache.airavata.gfac.ssh.util.GFACSSHUtils; +import org.apache.airavata.gfac.ssh.util.HandleOutputs; +import org.apache.airavata.gfac.ssh.api.Cluster; +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.model.workspace.experiment.CorrectiveAction; +import org.apache.airavata.model.workspace.experiment.ErrorCategory; +import org.apache.airavata.registry.cpi.ChildDataType; +import org.apache.airavata.registry.cpi.RegistryException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NewSSHOutputHandler extends AbstractHandler{ + + private static final Logger log = LoggerFactory.getLogger(NewSSHOutputHandler.class); + + public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + String hostAddress = jobExecutionContext.getHostName(); + Cluster cluster = null; + // Security Context and connection + try { + if (jobExecutionContext.getSecurityContext(hostAddress) == null) { + GFACSSHUtils.addSecurityContext(jobExecutionContext); + } + cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster(); + if (cluster == null) { + throw new GFacProviderException("Security context is not set properly"); + } else { + log.info("Successfully retrieved the Security Context"); + } + } catch (Exception e) { + log.error(e.getMessage()); + try { + StringWriter errors = new StringWriter(); + e.printStackTrace(new PrintWriter(errors)); + GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + } catch (GFacException e1) { + log.error(e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } + + super.invoke(jobExecutionContext); + List<OutputDataObjectType> outputArray = HandleOutputs.handleOutputs(jobExecutionContext, cluster); + try { + registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID()); + } catch (RegistryException e) { + throw new GFacHandlerException(e); + } + + + } + + @Override + public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + // TODO: Auto generated method body. + } + + @Override + public void initProperties(Properties properties) throws GFacHandlerException { + // TODO Auto-generated method stub + + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java new file mode 100644 index 0000000..fb86dd3 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java @@ -0,0 +1,119 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.ssh.handler; + +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.handler.AbstractHandler; +import org.apache.airavata.gfac.core.handler.GFacHandlerException; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; +import org.apache.airavata.gfac.ssh.util.GFACSSHUtils; +import org.apache.airavata.gfac.ssh.api.Cluster; +import org.apache.airavata.model.workspace.experiment.*; +import org.apache.airavata.registry.cpi.ChildDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Properties; + +public class SSHDirectorySetupHandler extends AbstractHandler { + private static final Logger log = LoggerFactory.getLogger(SSHDirectorySetupHandler.class); + + public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + try { + String hostAddress = jobExecutionContext.getHostName(); + if (jobExecutionContext.getSecurityContext(hostAddress) == null) { + GFACSSHUtils.addSecurityContext(jobExecutionContext); + } + } catch (Exception e) { + log.error(e.getMessage()); + try { + StringWriter errors = new StringWriter(); + e.printStackTrace(new PrintWriter(errors)); + GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + } catch (GFacException e1) { + log.error(e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } + + log.info("Setup SSH job directorties"); + super.invoke(jobExecutionContext); + makeDirectory(jobExecutionContext); + + } + + @Override + public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + // TODO: Auto generated method body. + } + + private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + Cluster cluster = null; + try{ + String hostAddress = jobExecutionContext.getHostName(); + cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster(); + if (cluster == null) { + throw new GFacHandlerException("Security context is not set properly"); + } else { + log.info("Successfully retrieved the Security Context"); + } + String workingDirectory = jobExecutionContext.getWorkingDir(); + cluster.makeDirectory(workingDirectory); + if(!jobExecutionContext.getInputDir().equals(workingDirectory)) + cluster.makeDirectory(jobExecutionContext.getInputDir()); + if(!jobExecutionContext.getOutputDir().equals(workingDirectory)) + cluster.makeDirectory(jobExecutionContext.getOutputDir()); + + DataTransferDetails detail = new DataTransferDetails(); + TransferStatus status = new TransferStatus(); + status.setTransferState(TransferState.DIRECTORY_SETUP); + detail.setTransferStatus(status); + detail.setTransferDescription("Working directory = " + workingDirectory); + + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + + } catch (Exception e) { + DataTransferDetails detail = new DataTransferDetails(); + TransferStatus status = new TransferStatus(); + status.setTransferState(TransferState.FAILED); + detail.setTransferStatus(status); + detail.setTransferDescription("Working directory = " + jobExecutionContext.getWorkingDir()); + try { + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + StringWriter errors = new StringWriter(); + e.printStackTrace(new PrintWriter(errors)); + GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE); + } catch (Exception e1) { + throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Error executing the Handler: " + SSHDirectorySetupHandler.class, e); + } + + } + + public void initProperties(Properties properties) throws GFacHandlerException { + + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java new file mode 100644 index 0000000..277ff0e --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java @@ -0,0 +1,198 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.ssh.handler; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.context.MessageContext; +import org.apache.airavata.gfac.core.handler.AbstractHandler; +import org.apache.airavata.gfac.core.handler.GFacHandlerException; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; +import org.apache.airavata.gfac.ssh.util.GFACSSHUtils; +import org.apache.airavata.gfac.ssh.api.Cluster; +import org.apache.airavata.model.appcatalog.appinterface.DataType; +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.model.workspace.experiment.*; +import org.apache.airavata.registry.cpi.ChildDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +public class SSHInputHandler extends AbstractHandler { + + private static final Logger log = LoggerFactory.getLogger(SSHInputHandler.class); + + + public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + DataTransferDetails detail = new DataTransferDetails(); + detail.setTransferDescription("Input Data Staging"); + TransferStatus status = new TransferStatus(); + int index = 0; + int oldIndex = 0; + List<String> oldFiles = new ArrayList<String>(); + StringBuffer data = new StringBuffer("|"); + MessageContext inputNew = new MessageContext(); + Cluster cluster = null; + + try { + String hostAddress = jobExecutionContext.getHostName(); + if (jobExecutionContext.getSecurityContext(hostAddress) == null) { + try { + GFACSSHUtils.addSecurityContext(jobExecutionContext); + } catch (ApplicationSettingsException e) { + log.error(e.getMessage()); + try { + StringWriter errors = new StringWriter(); + e.printStackTrace(new PrintWriter(errors)); + GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + } catch (GFacException e1) { + log.error(e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } + } + + cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster(); + if (cluster == null) { + throw new GFacException("Security context is not set properly"); + } else { + log.info("Successfully retrieved the Security Context"); + } + log.info("Invoking SCPInputHandler"); + super.invoke(jobExecutionContext); + + + MessageContext input = jobExecutionContext.getInMessageContext(); + Set<String> parameters = input.getParameters().keySet(); + for (String paramName : parameters) { + InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName); + String paramValue = inputParamType.getValue(); + //TODO: Review this with type + if (inputParamType.getType() == DataType.URI) { + if (index < oldIndex) { + log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); + inputParamType.setValue(oldFiles.get(index)); + data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index + } else { + String stageInputFile = stageInputFiles(cluster, jobExecutionContext, paramValue); + inputParamType.setValue(stageInputFile); + StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString()); + status.setTransferState(TransferState.UPLOAD); + detail.setTransferStatus(status); + detail.setTransferDescription("Input Data Staged: " + stageInputFile); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + + GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); + } + }// FIXME: what is the thrift model DataType equivalent for URIArray type? +// else if ("URIArray".equals(actualParameter.getType().getType().toString())) { +// if (index < oldIndex) { +// log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); +// ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index)); +// data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index +// }else{ +// List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue)); +// List<String> newFiles = new ArrayList<String>(); +// for (String paramValueEach : split) { +// String stageInputFiles = stageInputFiles(cluster,jobExecutionContext, paramValueEach); +// status.setTransferState(TransferState.UPLOAD); +// detail.setTransferStatus(status); +// detail.setTransferDescription("Input Data Staged: " + stageInputFiles); +// registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); +// newFiles.add(stageInputFiles); +// StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString()); +// GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); +// } +// ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()])); +// } +// } + inputNew.getParameters().put(paramName, inputParamType); + } + } catch (Exception e) { + log.error(e.getMessage()); + status.setTransferState(TransferState.FAILED); + detail.setTransferStatus(status); + try { + StringWriter errors = new StringWriter(); + e.printStackTrace(new PrintWriter(errors)); + GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + } catch (Exception e1) { + throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage()); + } + jobExecutionContext.setInMessageContext(inputNew); + } + + @Override + public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + // TODO: Auto generated method body. + } + + private static String stageInputFiles(Cluster cluster, JobExecutionContext jobExecutionContext, String paramValue) throws IOException, GFacException { + int i = paramValue.lastIndexOf(File.separator); + String substring = paramValue.substring(i + 1); + try { + String targetFile = jobExecutionContext.getInputDir() + File.separator + substring; + if(paramValue.startsWith("scp:")){ + paramValue = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length()); + cluster.scpThirdParty(paramValue, targetFile); + }else{ + if(paramValue.startsWith("file")){ + paramValue = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length()); + } + boolean success = false; + int j = 1; + while(!success){ + try { + cluster.scpTo(targetFile, paramValue); + success = true; + } catch (Exception e) { + log.info(e.getLocalizedMessage()); + Thread.sleep(2000); + if(j==3) { + throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage()); + } + } + j++; + } + } + return targetFile; + } catch (Exception e) { + throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage()); + } + } + + public void initProperties(Properties properties) throws GFacHandlerException { + + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java new file mode 100644 index 0000000..7c5538a --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java @@ -0,0 +1,256 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.ssh.handler; + +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.gfac.Constants; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.handler.AbstractHandler; +import org.apache.airavata.gfac.core.handler.GFacHandlerException; +import org.apache.airavata.gfac.core.provider.GFacProviderException; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.impl.OutputUtils; +import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; +import org.apache.airavata.gfac.ssh.util.GFACSSHUtils; +import org.apache.airavata.gfac.ssh.api.Cluster; +import org.apache.airavata.model.appcatalog.appinterface.DataType; +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.model.workspace.experiment.CorrectiveAction; +import org.apache.airavata.model.workspace.experiment.DataTransferDetails; +import org.apache.airavata.model.workspace.experiment.ErrorCategory; +import org.apache.airavata.model.workspace.experiment.TaskDetails; +import org.apache.airavata.model.workspace.experiment.TransferState; +import org.apache.airavata.model.workspace.experiment.TransferStatus; +import org.apache.airavata.registry.cpi.ChildDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +public class SSHOutputHandler extends AbstractHandler { + private static final Logger log = LoggerFactory.getLogger(SSHOutputHandler.class); + + public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + String hostAddress = jobExecutionContext.getHostName(); + try { + if (jobExecutionContext.getSecurityContext(hostAddress) == null) { + GFACSSHUtils.addSecurityContext(jobExecutionContext); + } + } catch (Exception e) { + log.error(e.getMessage()); + try { + StringWriter errors = new StringWriter(); + e.printStackTrace(new PrintWriter(errors)); + GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + } catch (GFacException e1) { + log.error(e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } + + super.invoke(jobExecutionContext); + DataTransferDetails detail = new DataTransferDetails(); + detail.setTransferDescription("Output data staging"); + TransferStatus status = new TransferStatus(); + + Cluster cluster = null; + try { + cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster(); + if (cluster == null) { + throw new GFacProviderException("Security context is not set properly"); + } else { + log.info("Successfully retrieved the Security Context"); + } + + // Get the Stdouts and StdErrs + String timeStampedExperimentID = GFacUtils.createUniqueNameWithDate(jobExecutionContext.getExperimentID()); + + TaskDetails taskData = jobExecutionContext.getTaskData(); + String outputDataDir = ServerSettings.getSetting(Constants.OUTPUT_DATA_DIR, File.separator + "tmp"); + File localStdOutFile; + File localStdErrFile; + //FIXME: AdvancedOutput is remote location and third party transfer should work to make this work +// if (taskData.getAdvancedOutputDataHandling() != null) { +// outputDataDir = taskData.getAdvancedOutputDataHandling().getOutputDataDir(); +// } + if (outputDataDir == null) { + outputDataDir = File.separator + "tmp"; + } + outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID(); + (new File(outputDataDir)).mkdirs(); + + + localStdOutFile = new File(outputDataDir + File.separator + timeStampedExperimentID + "stdout"); + localStdErrFile = new File(outputDataDir + File.separator + timeStampedExperimentID + "stderr"); +// cluster.makeDirectory(outputDataDir); + int i = 0; + String stdOutStr = ""; + while (stdOutStr.isEmpty()) { + try { + cluster.scpFrom(jobExecutionContext.getStandardOutput(), localStdOutFile.getAbsolutePath()); + stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath()); + } catch (Exception e) { + log.error(e.getLocalizedMessage()); + Thread.sleep(2000); + } + i++; + if (i == 3) break; + } + Thread.sleep(1000); + cluster.scpFrom(jobExecutionContext.getStandardError(), localStdErrFile.getAbsolutePath()); + Thread.sleep(1000); + + String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath()); + status.setTransferState(TransferState.STDOUT_DOWNLOAD); + detail.setTransferStatus(status); + detail.setTransferDescription("STDOUT:" + localStdOutFile.getAbsolutePath()); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + + status.setTransferState(TransferState.STDERROR_DOWNLOAD); + detail.setTransferStatus(status); + detail.setTransferDescription("STDERR:" + localStdErrFile.getAbsolutePath()); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + + + List<OutputDataObjectType> outputArray = new ArrayList<OutputDataObjectType>(); + Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters(); + Set<String> keys = output.keySet(); + for (String paramName : keys) { + OutputDataObjectType actualParameter = (OutputDataObjectType) output.get(paramName); + if (DataType.URI == actualParameter.getType()) { + List<String> outputList = null; + int retry = 3; + while (retry > 0) { + outputList = cluster.listDirectory(jobExecutionContext.getOutputDir()); + if (outputList.size() > 0) { + break; + } + retry--; + Thread.sleep(2000); + } + + if (outputList.size() == 0 || outputList.get(0).isEmpty() || outputList.size() > 1) { + OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray); + Set<String> strings = output.keySet(); + outputArray.clear(); + for (String key : strings) { + OutputDataObjectType actualParameter1 = (OutputDataObjectType) output.get(key); + if (DataType.URI == actualParameter1.getType()) { + String downloadFile = actualParameter1.getValue(); + cluster.scpFrom(downloadFile, outputDataDir); + String fileName = downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar) + 1, downloadFile.length()); + String localFile = outputDataDir + File.separator + fileName; + jobExecutionContext.addOutputFile(localFile); + actualParameter1.setValue(localFile); + OutputDataObjectType dataObjectType = new OutputDataObjectType(); + dataObjectType.setValue(localFile); + dataObjectType.setName(key); + dataObjectType.setType(DataType.URI); + outputArray.add(dataObjectType); + }else if (DataType.STDOUT == actualParameter.getType()) { + String fileName = localStdOutFile.getName(); + String localFile = outputDataDir + File.separator + fileName; + jobExecutionContext.addOutputFile(localFile); + actualParameter.setValue(localFile); + OutputDataObjectType dataObjectType = new OutputDataObjectType(); + dataObjectType.setValue(localFile); + dataObjectType.setName(key); + dataObjectType.setType(DataType.STDOUT); + outputArray.add(dataObjectType); + }else if (DataType.STDERR == actualParameter.getType()) { + String fileName = localStdErrFile.getName(); + String localFile = outputDataDir + File.separator + fileName; + jobExecutionContext.addOutputFile(localFile); + actualParameter.setValue(localFile); + OutputDataObjectType dataObjectType = new OutputDataObjectType(); + dataObjectType.setValue(localFile); + dataObjectType.setName(key); + dataObjectType.setType(DataType.STDERR); + outputArray.add(dataObjectType); + } + } + break; + } else if (outputList.size() == 1) {//FIXME: Ultrascan case + String valueList = outputList.get(0); + cluster.scpFrom(jobExecutionContext.getOutputDir() + File.separator + valueList, outputDataDir); + String outputPath = outputDataDir + File.separator + valueList; + jobExecutionContext.addOutputFile(outputPath); + actualParameter.setValue(outputPath); + OutputDataObjectType dataObjectType = new OutputDataObjectType(); + dataObjectType.setValue(outputPath); + dataObjectType.setName(paramName); + dataObjectType.setType(DataType.URI); + outputArray.add(dataObjectType); + } + } else { + OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray); + } + } + if (outputArray == null || outputArray.isEmpty()) { + log.error("Empty Output returned from the Application, Double check the application and ApplicationDescriptor output Parameter Names"); + if (jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() == null) { + throw new GFacHandlerException( + "Empty Output returned from the Application, Double check the application" + + "and ApplicationDescriptor output Parameter Names"); + } + } + jobExecutionContext.setStandardError(localStdErrFile.getAbsolutePath()); + jobExecutionContext.setStandardOutput(localStdOutFile.getAbsolutePath()); + jobExecutionContext.setOutputDir(outputDataDir); + status.setTransferState(TransferState.DOWNLOAD); + detail.setTransferStatus(status); + detail.setTransferDescription(outputDataDir); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID()); + + } catch (Exception e) { + try { + status.setTransferState(TransferState.FAILED); + detail.setTransferStatus(status); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + StringWriter errors = new StringWriter(); + e.printStackTrace(new PrintWriter(errors)); + GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE); + } catch (Exception e1) { + throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Error in retrieving results", e); + } + + } + + @Override + public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + // TODO: Auto generated method body. + } + + public void initProperties(Properties properties) throws GFacHandlerException { + + } +}
