http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ede9cb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java ---------------------------------------------------------------------- diff --cc modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java index 0000000,b716099..3756140 mode 000000,100644..100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java @@@ -1,0 -1,747 +1,747 @@@ + /* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + package org.apache.airavata.gfac.core; + + import org.airavata.appcatalog.cpi.AppCatalog; + import org.airavata.appcatalog.cpi.AppCatalogException; + import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; + import org.apache.airavata.common.exception.ApplicationSettingsException; + import org.apache.airavata.common.utils.AiravataZKUtils; + import org.apache.airavata.common.utils.DBUtil; + import org.apache.airavata.common.utils.MonitorPublisher; + import org.apache.airavata.common.utils.ServerSettings; + import org.apache.airavata.credential.store.store.CredentialReader; + import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl; + import org.apache.airavata.gfac.Constants; + import org.apache.airavata.gfac.ExecutionMode; + import org.apache.airavata.gfac.GFacConfiguration; + import org.apache.airavata.gfac.GFacException; + import org.apache.airavata.gfac.core.context.JobExecutionContext; + import org.apache.airavata.gfac.core.handler.GFacHandlerException; + import org.apache.airavata.gfac.core.states.GfacExperimentState; + import org.apache.airavata.gfac.core.states.GfacHandlerState; + import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; + import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; + import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission; + import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; + import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission; + import org.apache.airavata.model.messaging.event.JobIdentifier; + import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent; + import org.apache.airavata.model.messaging.event.TaskIdentifier; + import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent; + import org.apache.airavata.model.workspace.experiment.ActionableGroup; + import org.apache.airavata.model.workspace.experiment.CorrectiveAction; + import org.apache.airavata.model.workspace.experiment.ErrorCategory; + import org.apache.airavata.model.workspace.experiment.ErrorDetails; + import org.apache.airavata.model.workspace.experiment.Experiment; + import org.apache.airavata.model.workspace.experiment.ExperimentState; + import org.apache.airavata.model.workspace.experiment.JobDetails; + import org.apache.airavata.model.workspace.experiment.JobState; + import org.apache.airavata.model.workspace.experiment.JobStatus; + import org.apache.airavata.model.workspace.experiment.TaskState; -import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; ++import org.apache.airavata.experiment.catalog.impl.RegistryFactory; + import org.apache.airavata.registry.cpi.ChildDataType; + import org.apache.airavata.registry.cpi.CompositeIdentifier; + import org.apache.airavata.registry.cpi.Registry; + import org.apache.airavata.registry.cpi.RegistryException; + import org.apache.airavata.registry.cpi.RegistryModelType; + import org.apache.curator.framework.CuratorFramework; + import org.apache.curator.utils.ZKPaths; + import org.apache.zookeeper.CreateMode; + import org.apache.zookeeper.KeeperException; + import org.apache.zookeeper.ZooDefs; + import org.apache.zookeeper.data.ACL; + import org.apache.zookeeper.data.Stat; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.w3c.dom.Document; + import org.w3c.dom.Element; + import org.w3c.dom.Node; + import org.w3c.dom.NodeList; + + import javax.xml.xpath.XPath; + import javax.xml.xpath.XPathConstants; + import javax.xml.xpath.XPathExpression; + import javax.xml.xpath.XPathExpressionException; + import javax.xml.xpath.XPathFactory; + import java.io.BufferedReader; + import java.io.File; + import java.io.FileNotFoundException; + import java.io.FileReader; + import java.io.IOException; + import java.io.InputStream; + import java.net.InetAddress; + import java.net.URISyntaxException; + import java.net.UnknownHostException; + import java.nio.ByteBuffer; + import java.util.ArrayList; + import java.util.Calendar; + import java.util.Date; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + + //import org.apache.airavata.commons.gfac.type.ActualParameter; + + public class GFacUtils { + private final static Logger log = LoggerFactory.getLogger(GFacUtils.class); + public static final ArrayList<ACL> OPEN_ACL_UNSAFE = ZooDefs.Ids.OPEN_ACL_UNSAFE; + + private GFacUtils() { + } + + /** + * Read data from inputStream and convert it to String. + * + * @param in + * @return String read from inputStream + * @throws java.io.IOException + */ + public static String readFromStream(InputStream in) throws IOException { + try { + StringBuffer wsdlStr = new StringBuffer(); + + int read; + + byte[] buf = new byte[1024]; + while ((read = in.read(buf)) > 0) { + wsdlStr.append(new String(buf, 0, read)); + } + return wsdlStr.toString(); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + log.warn("Cannot close InputStream: " + + in.getClass().getName(), e); + } + } + } + } + + /** + * This returns true if the give job is finished + * otherwise false + * + * @param job + * @return + */ + public static boolean isJobFinished(JobDescriptor job) { + if (org.apache.airavata.gfac.core.cluster.JobStatus.C.toString().equals(job.getStatus())) { + return true; + } else { + return false; + } + } + + /** + * This will read + * + * @param maxWalltime + * @return + */ + public static String maxWallTimeCalculator(int maxWalltime) { + if (maxWalltime < 60) { + return "00:" + maxWalltime + ":00"; + } else { + int minutes = maxWalltime % 60; + int hours = maxWalltime / 60; + return hours + ":" + minutes + ":00"; + } + } + public static String maxWallTimeCalculatorForLSF(int maxWalltime) { + if (maxWalltime < 60) { + return "00:" + maxWalltime; + } else { + int minutes = maxWalltime % 60; + int hours = maxWalltime / 60; + return hours + ":" + minutes; + } + } + /** + * this can be used to do framework opertaions specific to different modes + * + * @param jobExecutionContext + * @return + */ + public static boolean isSynchronousMode( + JobExecutionContext jobExecutionContext) { + GFacConfiguration gFacConfiguration = jobExecutionContext + .getGFacConfiguration(); + if (ExecutionMode.ASYNCHRONOUS.equals(gFacConfiguration + .getExecutionMode())) { + return false; + } + return true; + } + + public static String readFileToString(String file) + throws FileNotFoundException, IOException { + BufferedReader instream = null; + try { + + instream = new BufferedReader(new FileReader(file)); + StringBuffer buff = new StringBuffer(); + String temp = null; + while ((temp = instream.readLine()) != null) { + buff.append(temp); + buff.append(Constants.NEWLINE); + } + return buff.toString(); + } finally { + if (instream != null) { + try { + instream.close(); + } catch (IOException e) { + log.warn("Cannot close FileinputStream", e); + } + } + } + } + + public static boolean isLocalHost(String appHost) + throws UnknownHostException { + String localHost = InetAddress.getLocalHost().getCanonicalHostName(); + return (localHost.equals(appHost) + || Constants.LOCALHOST.equals(appHost) || Constants._127_0_0_1 + .equals(appHost)); + } + + public static String createUniqueNameWithDate(String name) { + String date = new Date().toString(); + date = date.replaceAll(" ", "_"); + date = date.replaceAll(":", "_"); + return name + "_" + date; + } + + public static List<Element> getElementList(Document doc, String expression) throws XPathExpressionException { + XPathFactory xPathFactory = XPathFactory.newInstance(); + XPath xPath = xPathFactory.newXPath(); + XPathExpression expr = xPath.compile(expression); + NodeList nodeList = (NodeList) expr.evaluate(doc, XPathConstants.NODESET); + List<Element> elementList = new ArrayList<Element>(); + for (int i = 0; i < nodeList.getLength(); i++) { + Node item = nodeList.item(i); + if (item instanceof Element) { + elementList.add((Element) item); + } + } + return elementList; + } + + public static String createGsiftpURIAsString(String host, String localPath) + throws URISyntaxException { + StringBuffer buf = new StringBuffer(); + if (!host.startsWith("gsiftp://")) + buf.append("gsiftp://"); + buf.append(host); + if (!host.endsWith("/")) + buf.append("/"); + buf.append(localPath); + return buf.toString(); + } + + public static void saveJobStatus(JobExecutionContext jobExecutionContext, + JobDetails details, JobState state) throws GFacException { + try { + // first we save job details to the registry for sa and then save the job status. + Registry registry = jobExecutionContext.getRegistry(); + JobStatus status = new JobStatus(); + status.setJobState(state); + details.setJobStatus(status); + registry.add(ChildDataType.JOB_DETAIL, details, + new CompositeIdentifier(jobExecutionContext.getTaskData() + .getTaskID(), details.getJobID())); + JobIdentifier identifier = new JobIdentifier(details.getJobID(), jobExecutionContext.getTaskData().getTaskID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(), + jobExecutionContext.getGatewayID()); + JobStatusChangeRequestEvent jobStatusChangeRequestEvent = new JobStatusChangeRequestEvent(state, identifier); + jobExecutionContext.getMonitorPublisher().publish(jobStatusChangeRequestEvent); + } catch (Exception e) { + throw new GFacException("Error persisting job status" + + e.getLocalizedMessage(), e); + } + } + + public static void updateJobStatus(JobExecutionContext jobExecutionContext, + JobDetails details, JobState state) throws GFacException { + try { + Registry registry = jobExecutionContext.getRegistry(); + JobStatus status = new JobStatus(); + status.setJobState(state); + status.setTimeOfStateChange(Calendar.getInstance() + .getTimeInMillis()); + details.setJobStatus(status); + registry.update( + org.apache.airavata.registry.cpi.RegistryModelType.JOB_DETAIL, + details, details.getJobID()); + } catch (Exception e) { + throw new GFacException("Error persisting job status" + + e.getLocalizedMessage(), e); + } + } + + public static void saveErrorDetails( + JobExecutionContext jobExecutionContext, String errorMessage, + CorrectiveAction action, ErrorCategory errorCatogory) + throws GFacException { + try { + Registry registry = jobExecutionContext.getRegistry(); + ErrorDetails details = new ErrorDetails(); + details.setActualErrorMessage(errorMessage); + details.setCorrectiveAction(action); + details.setActionableGroup(ActionableGroup.GATEWAYS_ADMINS); + details.setCreationTime(Calendar.getInstance().getTimeInMillis()); + details.setErrorCategory(errorCatogory); + registry.add(ChildDataType.ERROR_DETAIL, details, + jobExecutionContext.getTaskData().getTaskID()); + } catch (Exception e) { + throw new GFacException("Error persisting job status" + + e.getLocalizedMessage(), e); + } + } + + public static Map<String, Object> getInputParamMap(List<InputDataObjectType> experimentData) throws GFacException { + Map<String, Object> map = new HashMap<String, Object>(); + for (InputDataObjectType objectType : experimentData) { + map.put(objectType.getName(), objectType); + } + return map; + } + + public static Map<String, Object> getOuputParamMap(List<OutputDataObjectType> experimentData) throws GFacException { + Map<String, Object> map = new HashMap<String, Object>(); + for (OutputDataObjectType objectType : experimentData) { + map.put(objectType.getName(), objectType); + } + return map; + } + + public static GfacExperimentState getZKExperimentState(CuratorFramework curatorClient, + JobExecutionContext jobExecutionContext) + throws Exception { + String expState = AiravataZKUtils.getExpState(curatorClient, jobExecutionContext + .getExperimentID()); + if (expState == null || expState.isEmpty()) { + return GfacExperimentState.UNKNOWN; + } + return GfacExperimentState.findByValue(Integer.valueOf(expState)); + } + + public static boolean createHandlerZnode(CuratorFramework curatorClient, + JobExecutionContext jobExecutionContext, String className) + throws Exception { + String expState = AiravataZKUtils.getExpZnodeHandlerPath( + jobExecutionContext.getExperimentID(), className); + Stat exists = curatorClient.checkExists().forPath(expState); + if (exists == null) { + curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(expState, new byte[0]); + curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE) + .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]); + } else { + exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE); + if (exists == null) { + curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE) + .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]); + } + } + + exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE); + if (exists != null) { + curatorClient.setData().withVersion(exists.getVersion()) + .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, + String.valueOf(GfacHandlerState.INVOKING.getValue()).getBytes()); + } + return true; + } + + public static boolean createHandlerZnode(CuratorFramework curatorClient, + JobExecutionContext jobExecutionContext, String className, + GfacHandlerState state) throws Exception { + String expState = AiravataZKUtils.getExpZnodeHandlerPath( + jobExecutionContext.getExperimentID(), className); + Stat exists = curatorClient.checkExists().forPath(expState); + if (exists == null) { + curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE) + .forPath(expState, new byte[0]); + curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE) + .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]); + } else { + exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE); + if (exists == null) { + curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE) + .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]); + } + } + + exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE); + if (exists != null) { + curatorClient.setData().withVersion(exists.getVersion()) + .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, + String.valueOf(state.getValue()).getBytes()); + } + return true; + } + + public static boolean updateHandlerState(CuratorFramework curatorClient, + JobExecutionContext jobExecutionContext, String className, + GfacHandlerState state) throws Exception { + String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath( + jobExecutionContext.getExperimentID(), className); + Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE); + if (exists != null) { + curatorClient.setData().withVersion(exists.getVersion()) + .forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes()); + } else { + createHandlerZnode(curatorClient, jobExecutionContext, className, state); + } + return false; + } + + public static GfacHandlerState getHandlerState(CuratorFramework curatorClient, + JobExecutionContext jobExecutionContext, String className) { + try { + String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath( jobExecutionContext.getExperimentID(), className); + Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE); + if (exists != null) { + String stateVal = new String(curatorClient.getData().storingStatIn(exists) + .forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE)); + return GfacHandlerState.findByValue(Integer.valueOf(stateVal)); + } + return GfacHandlerState.UNKNOWN; // if the node doesn't exist or any other error we + // return false + } catch (Exception e) { + log.error("Error occured while getting zk node status", e); + return null; + } + } + + // This method is dangerous because of moving the experiment data + public static boolean createExperimentEntryForPassive(String experimentID, + String taskID, CuratorFramework curatorClient, String experimentNode, + String pickedChild, String tokenId, long deliveryTag) throws Exception { + String experimentPath = experimentNode + File.separator + pickedChild; + String newExperimentPath = experimentPath + File.separator + experimentID; + Stat exists1 = curatorClient.checkExists().forPath(newExperimentPath); + String oldExperimentPath = GFacUtils.findExperimentEntry(experimentID, curatorClient); + if (oldExperimentPath == null) { // this means this is a very new experiment + // are going to create a new node + log.info("This is a new Job, so creating all the experiment docs from the scratch"); + curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath, new byte[0]); + String stateNodePath = curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE) + .forPath(newExperimentPath + File.separator + "state", + String .valueOf(GfacExperimentState.LAUNCHED.getValue()) .getBytes()); + + if(curatorClient.checkExists().forPath(stateNodePath)!=null) { + log.info("Created the node: " + stateNodePath + " successfully !"); + }else { + log.error("Error creating node: " + stateNodePath + " successfully !"); + } + curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE) + .forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag)); + } else { + log.error("ExperimentID: " + experimentID + " taskID: " + taskID + " was running by some Gfac instance,but it failed"); + removeCancelDeliveryTagNode(oldExperimentPath, curatorClient); // remove previous cancel deliveryTagNode + if(newExperimentPath.equals(oldExperimentPath)){ + log.info("Re-launch experiment came to the same GFac instance"); + }else { + log.info("Re-launch experiment came to a new GFac instance so we are moving data to new gfac node"); + curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath, + curatorClient.getData().storingStatIn(exists1).forPath(oldExperimentPath)); // recursively copy children + copyChildren(curatorClient, oldExperimentPath, newExperimentPath, 2); // we need to copy children up to depth 2 + String oldDeliveryTag = oldExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX; + Stat exists = curatorClient.checkExists().forPath(oldDeliveryTag); + if(exists!=null) { + curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE) + .forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, + curatorClient.getData().storingStatIn(exists).forPath(oldDeliveryTag)); + ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldDeliveryTag, true); + } + // After all the files are successfully transfered we delete the // old experiment,otherwise we do + // not delete a single file + log.info("After a successful copying of experiment data for an old experiment we delete the old data"); + log.info("Deleting experiment data: " + oldExperimentPath); + ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldExperimentPath, true); + } + } + return true; + } + + private static void removeCancelDeliveryTagNode(String experimentPath, CuratorFramework curatorClient) throws Exception { + Stat exists = curatorClient.checkExists().forPath(experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX); + if (exists != null) { + ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, true); + } + } + + private static void copyChildren(CuratorFramework curatorClient, String oldPath, String newPath, int depth) throws Exception { + for (String childNode : curatorClient.getChildren().forPath(oldPath)) { + String oldChildPath = oldPath + File.separator + childNode; + Stat stat = curatorClient.checkExists().forPath(oldChildPath); // no need to check exists + String newChildPath = newPath + File.separator + childNode; + log.info("Creating new znode: " + newChildPath); + curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE) + .forPath(newChildPath, curatorClient.getData().storingStatIn(stat).forPath(oldChildPath)); + if (--depth > 0) { + copyChildren(curatorClient , oldChildPath, newChildPath, depth ); + } + } + } + + /** + * This will return a value if the server is down because we iterate through exisiting experiment nodes, not + * through gfac-server nodes + * + * @param experimentID + * @param curatorClient + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public static String findExperimentEntry(String experimentID, CuratorFramework curatorClient) throws Exception { + String experimentNode = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + List<String> children = curatorClient.getChildren().forPath(experimentNode); + for (String pickedChild : children) { + String experimentPath = experimentNode + File.separator + pickedChild; + String newExpNode = experimentPath + File.separator + experimentID; + Stat exists = curatorClient.checkExists().forPath(newExpNode); + if (exists == null) { + continue; + } else { + return newExpNode; + } + } + return null; + } + + public static boolean setExperimentCancel(String experimentId, CuratorFramework curatorClient, long deliveryTag) throws Exception { + String experimentEntry = GFacUtils.findExperimentEntry(experimentId, curatorClient); + if (experimentEntry == null) { + // This should be handle in validation request. Gfac shouldn't get any invalidate experiment. + log.error("Cannot find the experiment Entry, so cancel operation cannot be performed. " + + "This happen when experiment completed and already removed from the zookeeper"); + return false; + } else { + // check cancel operation is being processed for the same experiment. + Stat cancelState = curatorClient.checkExists().forPath(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX); + if (cancelState != null) { + // another cancel operation is being processed. only one cancel operation can exist for a given experiment. + return false; + } + + curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE) + .forPath(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag)); // save cancel delivery tag to be acknowledge at the end. + return true; + } + + } + public static boolean isCancelled(String experimentID, CuratorFramework curatorClient ) throws Exception { + String experimentEntry = GFacUtils.findExperimentEntry(experimentID, curatorClient); + if(experimentEntry == null){ + return false; + }else { + Stat exists = curatorClient.checkExists().forPath(experimentEntry); + if (exists != null) { + String operation = new String(curatorClient.getData().storingStatIn(exists).forPath(experimentEntry + File.separator + "operation")); + if ("cancel".equals(operation)) { + return true; + } + } + } + return false; + } + + public static void saveHandlerData(JobExecutionContext jobExecutionContext, + StringBuffer data, String className) throws GFacHandlerException { + try { + CuratorFramework curatorClient = jobExecutionContext.getCuratorClient(); + if (curatorClient != null) { + String expZnodeHandlerPath = AiravataZKUtils + .getExpZnodeHandlerPath( + jobExecutionContext.getExperimentID(), + className); + Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath); + if (exists != null) { + curatorClient.setData().withVersion(exists.getVersion()).forPath(expZnodeHandlerPath, data.toString().getBytes()); + } else { + log.error("Saving Handler data failed, Stat is null"); + } + } + } catch (Exception e) { + throw new GFacHandlerException(e); + } + } + + public static String getHandlerData(JobExecutionContext jobExecutionContext, String className) throws Exception { + CuratorFramework curatorClient = jobExecutionContext.getCuratorClient(); + if (curatorClient != null) { + String expZnodeHandlerPath = AiravataZKUtils + .getExpZnodeHandlerPath( + jobExecutionContext.getExperimentID(), + className); + Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath); + return new String(jobExecutionContext.getCuratorClient().getData().storingStatIn(exists).forPath(expZnodeHandlerPath)); + } + return null; + } + + public static CredentialReader getCredentialReader() + throws ApplicationSettingsException, IllegalAccessException, + InstantiationException { + try{ + String jdbcUrl = ServerSettings.getCredentialStoreDBURL(); + String jdbcUsr = ServerSettings.getCredentialStoreDBUser(); + String jdbcPass = ServerSettings.getCredentialStoreDBPassword(); + String driver = ServerSettings.getCredentialStoreDBDriver(); + return new CredentialReaderImpl(new DBUtil(jdbcUrl, jdbcUsr, jdbcPass, + driver)); + }catch(ClassNotFoundException e){ + log.error("Not able to find driver: " + e.getLocalizedMessage()); + return null; + } + } + + public static LOCALSubmission getLocalJobSubmission (String submissionId) throws AppCatalogException{ + try { + AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); + return appCatalog.getComputeResource().getLocalJobSubmission(submissionId); + }catch (Exception e){ + String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId; + log.error(errorMsg, e); + throw new AppCatalogException(errorMsg, e); + } + } + + public static UnicoreJobSubmission getUnicoreJobSubmission (String submissionId) throws AppCatalogException{ + try { + AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); + return appCatalog.getComputeResource().getUNICOREJobSubmission(submissionId); + }catch (Exception e){ + String errorMsg = "Error while retrieving UNICORE job submission with submission id : " + submissionId; + log.error(errorMsg, e); + throw new AppCatalogException(errorMsg, e); + } + } + + public static SSHJobSubmission getSSHJobSubmission (String submissionId) throws AppCatalogException{ + try { + AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); + return appCatalog.getComputeResource().getSSHJobSubmission(submissionId); + }catch (Exception e){ + String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId; + log.error(errorMsg, e); + throw new AppCatalogException(errorMsg, e); + } + } + + /** + * To convert list to separated value + * @param listOfStrings + * @param separator + * @return + */ + public static String listToCsv(List<String> listOfStrings, char separator) { + StringBuilder sb = new StringBuilder(); + + // all but last + for(int i = 0; i < listOfStrings.size() - 1 ; i++) { + sb.append(listOfStrings.get(i)); + sb.append(separator); + } + + // last string, no separator + if(listOfStrings.size() > 0){ + sb.append(listOfStrings.get(listOfStrings.size()-1)); + } + + return sb.toString(); + } + + public static byte[] longToBytes(long x) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(x); + return buffer.array(); + } + + public static long bytesToLong(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.put(bytes); + buffer.flip();//need flip + return buffer.getLong(); + } + + public static ExperimentState updateExperimentStatus(String experimentId, ExperimentState state) throws RegistryException { + Registry airavataRegistry = RegistryFactory.getDefaultRegistry(); + Experiment details = (Experiment) airavataRegistry.get(RegistryModelType.EXPERIMENT, experimentId); + if (details == null) { + details = new Experiment(); + details.setExperimentID(experimentId); + } + org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus(); + status.setExperimentState(state); + status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); + if (!ExperimentState.CANCELED.equals(details.getExperimentStatus().getExperimentState()) && + !ExperimentState.CANCELING.equals(details.getExperimentStatus().getExperimentState())) { + status.setExperimentState(state); + } else { + status.setExperimentState(details.getExperimentStatus().getExperimentState()); + } + details.setExperimentStatus(status); + log.info("Updating the experiment status of experiment: " + experimentId + " to " + status.getExperimentState().toString()); + airavataRegistry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId); + return details.getExperimentStatus().getExperimentState(); + } + + public static boolean isFailedJob (JobExecutionContext jec) { + JobStatus jobStatus = jec.getJobDetails().getJobStatus(); + if (jobStatus.getJobState() == JobState.FAILED) { + return true; + } + return false; + } + + public static boolean ackCancelRequest(String experimentId, CuratorFramework curatorClient) throws Exception { + String experimentEntry = GFacUtils.findExperimentEntry(experimentId, curatorClient); + String cancelNodePath = experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX; + if (experimentEntry == null) { + // This should be handle in validation request. Gfac shouldn't get any invalidate experiment. + log.error("Cannot find the experiment Entry, so cancel operation cannot be performed. " + + "This happen when experiment completed and already removed from the CuratorFramework"); + } else { + // check cancel operation is being processed for the same experiment. + Stat cancelState = curatorClient.checkExists().forPath(cancelNodePath); + if (cancelState != null) { + ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), cancelNodePath, true); + return true; + } + } + return false; + } + + public static void publishTaskStatus (JobExecutionContext jobExecutionContext, MonitorPublisher publisher, TaskState state){ + TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + jobExecutionContext.getExperimentID(), + jobExecutionContext.getGatewayID()); + publisher.publish(new TaskStatusChangeRequestEvent(state, taskIdentity)); + } + }
http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ede9cb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ede9cb/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java ---------------------------------------------------------------------- diff --cc modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java index 0000000,307d8c3..f28b6e4 mode 000000,100644..100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java @@@ -1,0 -1,562 +1,562 @@@ + /* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + package org.apache.airavata.gfac.ssh.util; + + import org.airavata.appcatalog.cpi.AppCatalog; + import org.airavata.appcatalog.cpi.AppCatalogException; + import org.apache.airavata.common.exception.ApplicationSettingsException; + import org.apache.airavata.common.utils.ServerSettings; + import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential; + import org.apache.airavata.gfac.Constants; + import org.apache.airavata.gfac.GFacException; + import org.apache.airavata.gfac.RequestData; + import org.apache.airavata.gfac.core.JobDescriptor; + import org.apache.airavata.gfac.core.JobManagerConfiguration; + import org.apache.airavata.gfac.core.cluster.Cluster; + import org.apache.airavata.gfac.core.cluster.ServerInfo; + import org.apache.airavata.gfac.core.context.JobExecutionContext; + import org.apache.airavata.gfac.core.context.MessageContext; + import org.apache.airavata.gfac.core.handler.GFacHandlerException; + import org.apache.airavata.gfac.core.GFacUtils; + import org.apache.airavata.gfac.gsi.ssh.impl.GSISSHAbstractCluster; + import org.apache.airavata.gfac.gsi.ssh.impl.PBSCluster; + import org.apache.airavata.gfac.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo; + import org.apache.airavata.gfac.gsi.ssh.util.CommonUtils; + import org.apache.airavata.gfac.ssh.context.SSHAuthWrapper; + import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; + import org.apache.airavata.gfac.ssh.security.TokenizedSSHAuthInfo; + import org.apache.airavata.gfac.core.authentication.AuthenticationInfo; + import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; + import org.apache.airavata.model.appcatalog.appdeployment.ApplicationParallelismType; + import org.apache.airavata.model.appcatalog.appinterface.DataType; + import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; + import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; + import org.apache.airavata.model.appcatalog.computeresource.*; + import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; + import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling; + import org.apache.airavata.model.workspace.experiment.CorrectiveAction; + import org.apache.airavata.model.workspace.experiment.ErrorCategory; + import org.apache.airavata.model.workspace.experiment.TaskDetails; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import java.io.File; + import java.io.PrintWriter; + import java.io.StringWriter; + import java.util.*; + + public class GFACSSHUtils { + private final static Logger logger = LoggerFactory.getLogger(GFACSSHUtils.class); + + public static Map<String, List<Cluster>> clusters = new HashMap<String, List<Cluster>>(); + + public static final String PBS_JOB_MANAGER = "pbs"; + public static final String SLURM_JOB_MANAGER = "slurm"; + public static final String SUN_GRID_ENGINE_JOB_MANAGER = "UGE"; + public static final String LSF_JOB_MANAGER = "LSF"; + + public static int maxClusterCount = 5; + + /** + * This method is to add computing resource specific authentication, if its a third party machine, use the other addSecurityContext + * @param jobExecutionContext + * @throws GFacException + * @throws ApplicationSettingsException + */ + public static void addSecurityContext(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException { + JobSubmissionProtocol preferredJobSubmissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol(); + JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface(); + if (preferredJobSubmissionProtocol == JobSubmissionProtocol.GLOBUS || preferredJobSubmissionProtocol == JobSubmissionProtocol.UNICORE) { + logger.error("This is a wrong method to invoke to non ssh host types,please check your gfac-config.xml"); + } else if (preferredJobSubmissionProtocol == JobSubmissionProtocol.SSH) { + try { + AppCatalog appCatalog = jobExecutionContext.getAppCatalog(); + SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(preferredJobSubmissionInterface.getJobSubmissionInterfaceId()); + SecurityProtocol securityProtocol = sshJobSubmission.getSecurityProtocol(); - if (securityProtocol == SecurityProtocol.GSI || securityProtocol == SecurityProtocol.SSH_KEYS) { ++ if (securityProtocol == SecurityProtocol.GSI || securityProtocol == SecurityProtocol.SSH_KEYS || securityProtocol == SecurityProtocol.USERNAME_PASSWORD) { + SSHSecurityContext sshSecurityContext = new SSHSecurityContext(); + String credentialStoreToken = jobExecutionContext.getCredentialStoreToken(); // this is set by the framework + RequestData requestData = new RequestData(jobExecutionContext.getGatewayID()); + requestData.setTokenId(credentialStoreToken); + + ServerInfo serverInfo = new ServerInfo(null, jobExecutionContext.getHostName()); + + Cluster pbsCluster = null; + try { + AuthenticationInfo tokenizedSSHAuthInfo = new TokenizedSSHAuthInfo(requestData); + String installedParentPath = jobExecutionContext.getResourceJobManager().getJobManagerBinPath(); + if (installedParentPath == null) { + installedParentPath = "/"; + } + + SSHCredential credentials =((TokenizedSSHAuthInfo)tokenizedSSHAuthInfo).getCredentials();// this is just a call to get and set credentials in to this object,data will be used - if(credentials.getPrivateKey()==null || credentials.getPublicKey()==null){ ++ if(credentials.getPrivateKey()==null || credentials.getPublicKey()==null || securityProtocol == SecurityProtocol.USERNAME_PASSWORD){ + // now we fall back to username password authentication + Properties configurationProperties = ServerSettings.getProperties(); + tokenizedSSHAuthInfo = new DefaultPasswordAuthenticationInfo(configurationProperties.getProperty(Constants.SSH_PASSWORD)); + } + // This should be the login user name from compute resource preference + String loginUser = jobExecutionContext.getLoginUserName(); + if (loginUser == null) { + loginUser = credentials.getPortalUserName(); + } + serverInfo.setUserName(loginUser); + jobExecutionContext.getExperiment().setUserName(loginUser); + + + // inside the pbsCluser object + + String key = loginUser + jobExecutionContext.getHostName() + serverInfo.getPort(); + boolean recreate = false; + synchronized (clusters) { + if (clusters.containsKey(key) && clusters.get(key).size() < maxClusterCount) { + recreate = true; + } else if (clusters.containsKey(key)) { + int i = new Random().nextInt(Integer.MAX_VALUE) % maxClusterCount; + if (clusters.get(key).get(i).getSession().isConnected()) { + pbsCluster = clusters.get(key).get(i); + } else { + clusters.get(key).remove(i); + recreate = true; + } + if (!recreate) { + try { + pbsCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate + } catch (Exception e) { + clusters.get(key).remove(i); + logger.info("Connection found the connection map is expired, so we create from the scratch"); + maxClusterCount++; + recreate = true; // we make the pbsCluster to create again if there is any exception druing connection + } + } + logger.info("Re-using the same connection used with the connection string:" + key); + } else { + recreate = true; + } + if (recreate) { + JobManagerConfiguration jConfig = null; + String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString(); + if (jobManager == null) { + logger.error("No Job Manager is configured, so we are picking pbs as the default job manager"); + jConfig = CommonUtils.getPBSJobManager(installedParentPath); + } else { + if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getPBSJobManager(installedParentPath); + } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getSLURMJobManager(installedParentPath); + } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getUGEJobManager(installedParentPath); + } else if (LSF_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getLSFJobManager(installedParentPath); + } + } + + pbsCluster = new PBSCluster(serverInfo, tokenizedSSHAuthInfo,jConfig); + List<Cluster> pbsClusters = null; + if (!(clusters.containsKey(key))) { + pbsClusters = new ArrayList<Cluster>(); + } else { + pbsClusters = clusters.get(key); + } + pbsClusters.add(pbsCluster); + clusters.put(key, pbsClusters); + } + } + } catch (Exception e) { + throw new GFacException("Error occurred...", e); + } + sshSecurityContext.setPbsCluster(pbsCluster); + jobExecutionContext.addSecurityContext(jobExecutionContext.getHostName(), sshSecurityContext); + } + } catch (AppCatalogException e) { + throw new GFacException("Error while getting SSH Submission object from app catalog", e); + } + } + } + + /** + * This method can be used to add third party resource security contexts + * @param jobExecutionContext + * @param sshAuth + * @throws GFacException + * @throws ApplicationSettingsException + */ + public static void addSecurityContext(JobExecutionContext jobExecutionContext,SSHAuthWrapper sshAuth) throws GFacException, ApplicationSettingsException { + try { + if(sshAuth== null) { + throw new GFacException("Error adding security Context, because sshAuthWrapper is null"); + } + SSHSecurityContext sshSecurityContext = new SSHSecurityContext(); + AppCatalog appCatalog = jobExecutionContext.getAppCatalog(); + JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface(); + SSHJobSubmission sshJobSubmission = null; + try { + sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(preferredJobSubmissionInterface.getJobSubmissionInterfaceId()); + } catch (Exception e1) { + logger.error("Not able to get SSHJobSubmission from registry"); + } + + Cluster pbsCluster = null; + String key=sshAuth.getKey(); + boolean recreate = false; + synchronized (clusters) { + if (clusters.containsKey(key) && clusters.get(key).size() < maxClusterCount) { + recreate = true; + } else if (clusters.containsKey(key)) { + int i = new Random().nextInt(Integer.MAX_VALUE) % maxClusterCount; + if (clusters.get(key).get(i).getSession().isConnected()) { + pbsCluster = clusters.get(key).get(i); + } else { + clusters.get(key).remove(i); + recreate = true; + } + if (!recreate) { + try { + pbsCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate + } catch (Exception e) { + clusters.get(key).remove(i); + logger.info("Connection found the connection map is expired, so we create from the scratch"); + maxClusterCount++; + recreate = true; // we make the pbsCluster to create again if there is any exception druing connection + } + } + logger.info("Re-using the same connection used with the connection string:" + key); + } else { + recreate = true; + } + if (recreate) { + JobManagerConfiguration jConfig = null; + String installedParentPath = null; + if(jobExecutionContext.getResourceJobManager()!= null){ + installedParentPath = jobExecutionContext.getResourceJobManager().getJobManagerBinPath(); + } + if (installedParentPath == null) { + installedParentPath = "/"; + } + if (sshJobSubmission != null) { + String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString(); + if (jobManager == null) { + logger.error("No Job Manager is configured, so we are picking pbs as the default job manager"); + jConfig = CommonUtils.getPBSJobManager(installedParentPath); + } else { + if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getPBSJobManager(installedParentPath); + } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getSLURMJobManager(installedParentPath); + } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getUGEJobManager(installedParentPath); + } else if (LSF_JOB_MANAGER.equals(jobManager)) { + jConfig = CommonUtils.getLSFJobManager(installedParentPath); + } + } + } + pbsCluster = new PBSCluster(sshAuth.getServerInfo(), sshAuth.getAuthenticationInfo(),jConfig); + key = sshAuth.getKey(); + List<Cluster> pbsClusters = null; + if (!(clusters.containsKey(key))) { + pbsClusters = new ArrayList<Cluster>(); + } else { + pbsClusters = clusters.get(key); + } + pbsClusters.add(pbsCluster); + clusters.put(key, pbsClusters); + } + } + sshSecurityContext.setPbsCluster(pbsCluster); + jobExecutionContext.addSecurityContext(key, sshSecurityContext); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new GFacException("Error adding security Context", e); + } + } + + + public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext, Cluster cluster) throws AppCatalogException, ApplicationSettingsException { + JobDescriptor jobDescriptor = new JobDescriptor(); + TaskDetails taskData = jobExecutionContext.getTaskData(); + + + // set email based job monitoring email address if monitor mode is JOB_EMAIL_NOTIFICATION_MONITOR + boolean addJobNotifMail = isEmailBasedJobMonitor(jobExecutionContext); + String emailIds = null; + if (addJobNotifMail) { + emailIds = ServerSettings.getEmailBasedMonitorAddress(); + } + // add all configured job notification email addresses. + if (ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")) { + String flags = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_FLAGS); + if (flags != null && jobExecutionContext.getApplicationContext().getComputeResourceDescription().getHostName().equals("stampede.tacc.xsede.org")) { + flags = "ALL"; + } + jobDescriptor.setMailOptions(flags); + + String userJobNotifEmailIds = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS); + if (userJobNotifEmailIds != null && !userJobNotifEmailIds.isEmpty()) { + if (emailIds != null && !emailIds.isEmpty()) { + emailIds += ("," + userJobNotifEmailIds); + } else { + emailIds = userJobNotifEmailIds; + } + } + + if (taskData.isEnableEmailNotification()) { + List<String> emailList = jobExecutionContext.getTaskData().getEmailAddresses(); + String elist = GFacUtils.listToCsv(emailList, ','); + if (elist != null && !elist.isEmpty()) { + if (emailIds != null && !emailIds.isEmpty()) { + emailIds = emailIds + "," + elist; + } else { + emailIds = elist; + } + } + } + } + if (emailIds != null && !emailIds.isEmpty()) { + logger.info("Email list: " + emailIds); + jobDescriptor.setMailAddress(emailIds); + } + // this is common for any application descriptor + + jobDescriptor.setCallBackIp(ServerSettings.getIp()); + jobDescriptor.setCallBackPort(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.GFAC_SERVER_PORT, "8950")); + jobDescriptor.setInputDirectory(jobExecutionContext.getInputDir()); + jobDescriptor.setOutputDirectory(jobExecutionContext.getOutputDir()); + jobDescriptor.setExecutablePath(jobExecutionContext.getApplicationContext() + .getApplicationDeploymentDescription().getExecutablePath()); + jobDescriptor.setStandardOutFile(jobExecutionContext.getStandardOutput()); + jobDescriptor.setStandardErrorFile(jobExecutionContext.getStandardError()); + String computationalProjectAccount = taskData.getTaskScheduling().getComputationalProjectAccount(); + if (computationalProjectAccount == null){ + ComputeResourcePreference computeResourcePreference = jobExecutionContext.getApplicationContext().getComputeResourcePreference(); + if (computeResourcePreference != null) { + computationalProjectAccount = computeResourcePreference.getAllocationProjectNumber(); + } + } + if (computationalProjectAccount != null) { + jobDescriptor.setAcountString(computationalProjectAccount); + } + // To make job name alpha numeric + jobDescriptor.setJobName("A" + String.valueOf(generateJobName())); + jobDescriptor.setWorkingDirectory(jobExecutionContext.getWorkingDir()); + + List<String> inputValues = new ArrayList<String>(); + MessageContext input = jobExecutionContext.getInMessageContext(); + + // sort the inputs first and then build the command ListR + Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() { + @Override + public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) { + return inputDataObjectType.getInputOrder() - t1.getInputOrder(); + } + }; + Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator); + for (Object object : input.getParameters().values()) { + if (object instanceof InputDataObjectType) { + InputDataObjectType inputDOT = (InputDataObjectType) object; + sortedInputSet.add(inputDOT); + } + } + for (InputDataObjectType inputDataObjectType : sortedInputSet) { + if (!inputDataObjectType.isRequiredToAddedToCommandLine()) { + continue; + } + if (inputDataObjectType.getApplicationArgument() != null + && !inputDataObjectType.getApplicationArgument().equals("")) { + inputValues.add(inputDataObjectType.getApplicationArgument()); + } + + if (inputDataObjectType.getValue() != null + && !inputDataObjectType.getValue().equals("")) { + if (inputDataObjectType.getType() == DataType.URI) { + // set only the relative path + String filePath = inputDataObjectType.getValue(); + filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length()); + inputValues.add(filePath); + }else { + inputValues.add(inputDataObjectType.getValue()); + } + + } + } + Map<String, Object> outputParams = jobExecutionContext.getOutMessageContext().getParameters(); + for (Object outputParam : outputParams.values()) { + if (outputParam instanceof OutputDataObjectType) { + OutputDataObjectType output = (OutputDataObjectType) outputParam; + if (output.getApplicationArgument() != null + && !output.getApplicationArgument().equals("")) { + inputValues.add(output.getApplicationArgument()); + } + if (output.getValue() != null && !output.getValue().equals("") && output.isRequiredToAddedToCommandLine()) { + if (output.getType() == DataType.URI){ + String filePath = output.getValue(); + filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length()); + inputValues.add(filePath); + } + } + } + } + + jobDescriptor.setInputValues(inputValues); + jobDescriptor.setUserName(((GSISSHAbstractCluster) cluster).getServerInfo().getUserName()); + jobDescriptor.setShellName("/bin/bash"); + jobDescriptor.setAllEnvExport(true); + jobDescriptor.setOwner(((PBSCluster) cluster).getServerInfo().getUserName()); + + ResourceJobManager resourceJobManager = jobExecutionContext.getResourceJobManager(); + + + ComputationalResourceScheduling taskScheduling = taskData.getTaskScheduling(); + if (taskScheduling != null) { + int totalNodeCount = taskScheduling.getNodeCount(); + int totalCPUCount = taskScheduling.getTotalCPUCount(); + + + if (taskScheduling.getComputationalProjectAccount() != null) { + jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount()); + } + if (taskScheduling.getQueueName() != null) { + jobDescriptor.setQueueName(taskScheduling.getQueueName()); + } + + if (totalNodeCount > 0) { + jobDescriptor.setNodes(totalNodeCount); + } + if (taskScheduling.getComputationalProjectAccount() != null) { + jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount()); + } + if (taskScheduling.getQueueName() != null) { + jobDescriptor.setQueueName(taskScheduling.getQueueName()); + } + if (totalCPUCount > 0) { + int ppn = totalCPUCount / totalNodeCount; + jobDescriptor.setProcessesPerNode(ppn); + jobDescriptor.setCPUCount(totalCPUCount); + } + if (taskScheduling.getWallTimeLimit() > 0) { + jobDescriptor.setMaxWallTime(String.valueOf(taskScheduling.getWallTimeLimit())); + if(resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)){ + jobDescriptor.setMaxWallTimeForLSF(String.valueOf(taskScheduling.getWallTimeLimit())); + } + } + if (taskScheduling.getTotalPhysicalMemory() > 0) { + jobDescriptor.setUsedMemory(taskScheduling.getTotalPhysicalMemory() + ""); + } + } else { + logger.error("Task scheduling cannot be null at this point.."); + } + ApplicationDeploymentDescription appDepDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription(); + List<String> moduleCmds = appDepDescription.getModuleLoadCmds(); + if (moduleCmds != null) { + for (String moduleCmd : moduleCmds) { + jobDescriptor.addModuleLoadCommands(moduleCmd); + } + } + List<String> preJobCommands = appDepDescription.getPreJobCommands(); + if (preJobCommands != null) { + for (String preJobCommand : preJobCommands) { + jobDescriptor.addPreJobCommand(parseCommand(preJobCommand, jobExecutionContext)); + } + } + + List<String> postJobCommands = appDepDescription.getPostJobCommands(); + if (postJobCommands != null) { + for (String postJobCommand : postJobCommands) { + jobDescriptor.addPostJobCommand(parseCommand(postJobCommand, jobExecutionContext)); + } + } + + ApplicationParallelismType parallelism = appDepDescription.getParallelism(); + if (parallelism != null){ + if (parallelism == ApplicationParallelismType.MPI || parallelism == ApplicationParallelismType.OPENMP || parallelism == ApplicationParallelismType.OPENMP_MPI){ + Map<JobManagerCommand, String> jobManagerCommands = resourceJobManager.getJobManagerCommands(); + if (jobManagerCommands != null && !jobManagerCommands.isEmpty()) { + for (JobManagerCommand command : jobManagerCommands.keySet()) { + if (command == JobManagerCommand.SUBMISSION) { + String commandVal = jobManagerCommands.get(command); + jobDescriptor.setJobSubmitter(commandVal); + } + } + } + } + } + return jobDescriptor; + } + + public static boolean isEmailBasedJobMonitor(JobExecutionContext jobExecutionContext) throws AppCatalogException { + if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) { + String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId(); + SSHJobSubmission sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId); + MonitorMode monitorMode = sshJobSubmission.getMonitorMode(); + return monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR; + } else { + return false; + } + } + + private static int generateJobName() { + Random random = new Random(); + int i = random.nextInt(Integer.MAX_VALUE); + i = i + 99999999; + if(i<0) { + i = i * (-1); + } + return i; + } + + private static String parseCommand(String value, JobExecutionContext jobExecutionContext) { + String parsedValue = value.replaceAll("\\$workingDir", jobExecutionContext.getWorkingDir()); + parsedValue = parsedValue.replaceAll("\\$inputDir", jobExecutionContext.getInputDir()); + parsedValue = parsedValue.replaceAll("\\$outputDir", jobExecutionContext.getOutputDir()); + return parsedValue; + } + /** + * This method can be used to set the Security Context if its not set and later use it in other places + * @param jobExecutionContext + * @param authenticationInfo + * @param userName + * @param hostName + * @param port + * @return + * @throws GFacException + */ + public static String prepareSecurityContext(JobExecutionContext jobExecutionContext, AuthenticationInfo authenticationInfo + , String userName, String hostName, int port) throws GFacException { + ServerInfo serverInfo = new ServerInfo(userName, hostName); + String key = userName+hostName+port; + SSHAuthWrapper sshAuthWrapper = new SSHAuthWrapper(serverInfo, authenticationInfo, key); + if (jobExecutionContext.getSecurityContext(key) == null) { + try { + GFACSSHUtils.addSecurityContext(jobExecutionContext, sshAuthWrapper); + } catch (ApplicationSettingsException e) { + logger.error(e.getMessage()); + try { + StringWriter errors = new StringWriter(); + e.printStackTrace(new PrintWriter(errors)); + GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + } catch (GFacException e1) { + logger.error(e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } + } + return key; + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ede9cb/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java ---------------------------------------------------------------------- diff --cc modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java index 0000000,73a6e4a..38981aa mode 000000,100644..100644 --- a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java +++ b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java @@@ -1,0 -1,252 +1,252 @@@ + ///* + // * + // * Licensed to the Apache Software Foundation (ASF) under one + // * or more contributor license agreements. See the NOTICE file + // * distributed with this work for additional information + // * regarding copyright ownership. The ASF licenses this file + // * to you under the Apache License, Version 2.0 (the + // * "License"); you may not use this file except in compliance + // * with the License. You may obtain a copy of the License at + // * + // * http://www.apache.org/licenses/LICENSE-2.0 + // * + // * Unless required by applicable law or agreed to in writing, + // * software distributed under the License is distributed on an + // * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + // * KIND, either express or implied. See the License for the + // * specific language governing permissions and limitations + // * under the License. + // * + //*/ + //package org.apache.airavata.core.gfac.services.impl; + // + //import org.apache.airavata.commons.gfac.type.ActualParameter; + //import org.apache.airavata.commons.gfac.type.ApplicationDescription; + //import org.apache.airavata.commons.gfac.type.HostDescription; + //import org.apache.airavata.commons.gfac.type.ServiceDescription; + //import org.apache.airavata.gfac.GFacConfiguration; + //import org.apache.airavata.gfac.GFacException; + //import org.apache.airavata.gfac.SecurityContext; + //import org.apache.airavata.gfac.core.context.ApplicationContext; + //import org.apache.airavata.gfac.core.context.JobExecutionContext; + //import org.apache.airavata.gfac.core.context.MessageContext; + //import org.apache.airavata.gfac.impl.BetterGfacImpl; + //import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; + //import org.apache.airavata.gfac.ssh.api.Cluster; + //import org.apache.airavata.gfac.ssh.api.SSHApiException; + //import org.apache.airavata.gfac.ssh.api.ServerInfo; + //import AuthenticationInfo; + //import org.apache.airavata.gfac.ssh.api.job.JobManagerConfiguration; + //import org.apache.airavata.gfac.ssh.impl.PBSCluster; + //import org.apache.airavata.gfac.ssh.impl.authentication.DefaultPasswordAuthenticationInfo; + //import org.apache.airavata.gfac.ssh.impl.authentication.DefaultPublicKeyFileAuthentication; + //import org.apache.airavata.gfac.ssh.util.CommonUtils; + //import org.apache.airavata.model.workspace.experiment.TaskDetails; -//import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; ++//import org.apache.airavata.experiment.registry.jpa.impl.RegistryFactory; + //import org.apache.airavata.schemas.gfac.*; + //import org.testng.annotations.BeforeClass; + //import org.testng.annotations.Test; + // + //import java.io.File; + //import java.net.URL; + //import java.util.ArrayList; + //import java.util.Date; + //import java.util.List; + //import java.util.UUID; + // + //public class BigRed2TestWithSSHAuth { + // private JobExecutionContext jobExecutionContext; + // + // private String userName; + // private String password; + // private String passPhrase; + // private String hostName; + // private String workingDirectory; + // private String privateKeyPath; + // private String publicKeyPath; + // + // @BeforeClass + // public void setUp() throws Exception { + // + // System.out.println("Test case name " + this.getClass().getName()); + //// System.setProperty("ssh.host","bigred2.uits.iu.edu"); //default ssh host + //// System.setProperty("ssh.user", "lginnali"); + //// System.setProperty("ssh.private.key.path", "/Users/lahirugunathilake/.ssh/id_dsa"); + //// System.setProperty("ssh.public.key.path", "/Users/lahirugunathilake/.ssh/id_dsa.pub"); + //// System.setProperty("ssh.working.directory", "/tmp"); + // + // this.hostName = "bigred2.uits.iu.edu"; + // this.hostName = System.getProperty("ssh.host"); + // this.userName = System.getProperty("ssh.username"); + // this.password = System.getProperty("ssh.password"); + // this.privateKeyPath = System.getProperty("private.ssh.key"); + // this.publicKeyPath = System.getProperty("public.ssh.key"); + // this.passPhrase = System.getProperty("ssh.keypass"); + // this.workingDirectory = System.getProperty("ssh.working.directory"); + // + // + // if (this.userName == null + // || (this.password==null && (this.publicKeyPath == null || this.privateKeyPath == null)) || this.workingDirectory == null) { + // System.out.println("########### In order to test you have to either username password or private,public keys"); + // System.out.println("Use -Dssh.username=xxx -Dssh.password=yyy -Dssh.keypass=zzz " + + // "-Dprivate.ssh.key -Dpublic.ssh.key -Dssh.working.directory "); + // } + // URL resource = BigRed2TestWithSSHAuth.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML); + // assert resource != null; + // System.out.println(resource.getFile()); + // GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), null); + // + //// gFacConfiguration.setMyProxyLifeCycle(3600); + //// gFacConfiguration.setMyProxyServer("myproxy.teragrid.org"); + //// gFacConfiguration.setMyProxyUser("*****"); + //// gFacConfiguration.setMyProxyPassphrase("*****"); + //// gFacConfiguration.setTrustedCertLocation("./certificates"); + //// //have to set InFlwo Handlers and outFlowHandlers + //// gFacConfiguration.setInHandlers(Arrays.asList(new String[] {"org.apache.airavata.gfac.handler.GramDirectorySetupHandler","org.apache.airavata.gfac.handler.GridFTPInputHandler"})); + //// gFacConfiguration.setOutHandlers(Arrays.asList(new String[] {"org.apache.airavata.gfac.handler.GridFTPOutputHandler"})); + // + // /* + // * Host + // */ + // HostDescription host = new HostDescription(SSHHostType.type); + // host.getType().setHostAddress(hostName); + // host.getType().setHostName(hostName); + // ((SSHHostType)host.getType()).setHpcResource(true); + // /* + // * App + // */ + // ApplicationDescription appDesc = new ApplicationDescription(HpcApplicationDeploymentType.type); + // HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) appDesc.getType(); + // ApplicationDeploymentDescriptionType.ApplicationName name = ApplicationDeploymentDescriptionType.ApplicationName.Factory.newInstance(); + // name.setStringValue("EchoLocal"); + // app.setApplicationName(name); + // + // app.setCpuCount(1); + // app.setJobType(JobTypeType.SERIAL); + // app.setNodeCount(1); + // app.setProcessorsPerNode(1); + // + // /* + // * Use bat file if it is compiled on Windows + // */ + // app.setExecutableLocation("/bin/echo"); + // + // /* + // * Default tmp location + // */ + // String tempDir = "/tmp"; + // String date = (new Date()).toString(); + // date = date.replaceAll(" ", "_"); + // date = date.replaceAll(":", "_"); + // + // tempDir = tempDir + File.separator + // + "SimpleEcho" + "_" + date + "_" + UUID.randomUUID(); + // + // System.out.println(tempDir); + // app.setScratchWorkingDirectory(tempDir); + // app.setStaticWorkingDirectory(tempDir); + // app.setInputDataDirectory(tempDir + File.separator + "inputData"); + // app.setOutputDataDirectory(tempDir + File.separator + "outputData"); + // app.setStandardOutput(tempDir + File.separator + app.getApplicationName().getStringValue() + ".stdout"); + // app.setStandardError(tempDir + File.separator + app.getApplicationName().getStringValue() + ".stderr"); + // app.setMaxWallTime(5); + // app.setJobSubmitterCommand("aprun -n 1"); + // app.setInstalledParentPath("/opt/torque/torque-4.2.3.1/bin/"); + // + // /* + // * Service + // */ + // ServiceDescription serv = new ServiceDescription(); + // serv.getType().setName("SimpleEcho"); + // + // List<InputParameterType> inputList = new ArrayList<InputParameterType>(); + // + // InputParameterType input = InputParameterType.Factory.newInstance(); + // input.setParameterName("echo_input"); + // input.setParameterType(StringParameterType.Factory.newInstance()); + // inputList.add(input); + // + // InputParameterType[] inputParamList = inputList.toArray(new InputParameterType[inputList + // + // .size()]); + // List<OutputParameterType> outputList = new ArrayList<OutputParameterType>(); + // OutputParameterType output = OutputParameterType.Factory.newInstance(); + // output.setParameterName("echo_output"); + // output.setParameterType(StringParameterType.Factory.newInstance()); + // outputList.add(output); + // + // OutputParameterType[] outputParamList = outputList + // .toArray(new OutputParameterType[outputList.size()]); + // + // serv.getType().setInputParametersArray(inputParamList); + // serv.getType().setOutputParametersArray(outputParamList); + // + // jobExecutionContext = new JobExecutionContext(gFacConfiguration, serv.getType().getName()); + // // Adding security context + // jobExecutionContext.addSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT, getSecurityContext(app)); + // ApplicationContext applicationContext = new ApplicationContext(); + // jobExecutionContext.setApplicationContext(applicationContext); + // applicationContext.setServiceDescription(serv); + // applicationContext.setApplicationDeploymentDescription(appDesc); + // applicationContext.setHostDescription(host); + // + // MessageContext inMessage = new MessageContext(); + // ActualParameter echo_input = new ActualParameter(); + // ((StringParameterType) echo_input.getType()).setValue("echo_output=hello"); + // inMessage.addParameter("echo_input", echo_input); + // + // + // jobExecutionContext.setInMessageContext(inMessage); + // + // MessageContext outMessage = new MessageContext(); + // ActualParameter echo_out = new ActualParameter(); + //// ((StringParameterType)echo_input.getType()).setValue("echo_output=hello"); + // outMessage.addParameter("echo_output", echo_out); + // jobExecutionContext.setRegistry(RegistryFactory.getLoggingRegistry()); + // jobExecutionContext.setTaskData(new TaskDetails("11323")); + // jobExecutionContext.setOutMessageContext(outMessage); + // + // } + // + // + // private SecurityContext getSecurityContext(HpcApplicationDeploymentType app) { + // try { + // + // AuthenticationInfo authenticationInfo = null; + // if (password != null) { + // authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password); + // } else { + // authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath, + // this.passPhrase); + // } + // // Server info + // ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName); + // + // Cluster pbsCluster = null; + // SSHSecurityContext sshSecurityContext = null; + // + // JobManagerConfiguration pbsJobManager = CommonUtils.getPBSJobManager(app.getInstalledParentPath()); + // pbsCluster = new PBSCluster(serverInfo, authenticationInfo, pbsJobManager); + // + // + // sshSecurityContext = new SSHSecurityContext(); + // sshSecurityContext.setPbsCluster(pbsCluster); + // sshSecurityContext.setUsername(userName); + // sshSecurityContext.setKeyPass(passPhrase); + // sshSecurityContext.setPrivateKeyLoc(privateKeyPath); + // return sshSecurityContext; + // } catch (SSHApiException e) { + // e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + // } + // return null; + // } + // + // @Test + // public void testSSHProvider() throws GFacException { + // BetterGfacImpl gFacAPI = new BetterGfacImpl(); + // gFacAPI.submitJob(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getGatewayID()); + // org.junit.Assert.assertNotNull(jobExecutionContext.getJobDetails().getJobDescription()); + // org.junit.Assert.assertNotNull(jobExecutionContext.getJobDetails().getJobID()); + // } + // + //}
