http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java deleted file mode 100644 index f027ccb..0000000 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.core.utils; - -import org.apache.airavata.common.utils.MonitorPublisher; -import org.apache.airavata.gfac.GFacException; -import org.apache.airavata.gfac.core.context.JobExecutionContext; -import org.apache.airavata.gfac.core.cpi.GFac; -import org.apache.airavata.gfac.core.monitor.MonitorID; -import org.apache.airavata.model.messaging.event.TaskIdentifier; -import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent; -import org.apache.airavata.model.workspace.experiment.CorrectiveAction; -import org.apache.airavata.model.workspace.experiment.ErrorCategory; -import org.apache.airavata.model.workspace.experiment.TaskState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.PrintWriter; -import java.io.StringWriter; - -public class OutHandlerWorker implements Runnable { - private final static Logger logger = LoggerFactory.getLogger(OutHandlerWorker.class); - - private GFac gfac; - - private MonitorID monitorID; - - private MonitorPublisher monitorPublisher; - private JobExecutionContext jEC; - - public OutHandlerWorker(GFac gfac, MonitorID monitorID,MonitorPublisher monitorPublisher) { - this.gfac = gfac; - this.monitorID = monitorID; - this.monitorPublisher = monitorPublisher; - this.jEC = monitorID.getJobExecutionContext(); - } - - public OutHandlerWorker(JobExecutionContext jEC) { - this.jEC = jEC; - this.gfac = jEC.getGfac(); - this.monitorPublisher = jEC.getMonitorPublisher(); - } - - @Override - public void run() { - try { -// gfac.invokeOutFlowHandlers(monitorID.getJobExecutionContext()); - gfac.invokeOutFlowHandlers(jEC); - } catch (Exception e) { - logger.error(e.getMessage(),e); - TaskIdentifier taskIdentifier = new TaskIdentifier(monitorID.getTaskID(), monitorID.getWorkflowNodeID(),monitorID.getExperimentID(), monitorID.getJobExecutionContext().getGatewayID()); - //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status - monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentifier)); - try { - StringWriter errors = new StringWriter(); - e.printStackTrace(new PrintWriter(errors)); - GFacUtils.saveErrorDetails(monitorID.getJobExecutionContext(), errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); - } catch (GFacException e1) { - logger.error("Error while persisting error details", e); - } - logger.info(e.getLocalizedMessage(), e); - // Save error details to registry - - } -// monitorPublisher.publish(monitorID.getStatus()); - monitorPublisher.publish(jEC.getJobDetails().getJobStatus()); - - } -}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutputUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutputUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutputUtils.java deleted file mode 100644 index 3c8bbf0..0000000 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutputUtils.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.core.utils; - -import org.apache.airavata.common.utils.StringUtil; -import org.apache.airavata.gfac.core.handler.GFacHandlerException; -import org.apache.airavata.model.appcatalog.appinterface.DataType; -import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -public class OutputUtils { - private static String regexPattern = "\\s*=\\s*(.*)\\r?\\n"; - - public static void fillOutputFromStdout(Map<String, Object> output, String stdout, String stderr, List<OutputDataObjectType> outputArray) throws Exception { - // this is no longer correct -// if (stdout == null || stdout.equals("")) { -// throw new GFacHandlerException("Standard output is empty."); -// } - - Set<String> keys = output.keySet(); - OutputDataObjectType actual = null; - OutputDataObjectType resultOutput = null; - for (String paramName : keys) { - actual = (OutputDataObjectType) output.get(paramName); - // if parameter value is not already set, we let it go - - if (actual == null) { - continue; - } - resultOutput = new OutputDataObjectType(); - if (DataType.STDOUT == actual.getType()) { - actual.setValue(stdout); - resultOutput.setName(paramName); - resultOutput.setType(DataType.STDOUT); - resultOutput.setValue(stdout); - outputArray.add(resultOutput); - } else if (DataType.STDERR == actual.getType()) { - actual.setValue(stderr); - resultOutput.setName(paramName); - resultOutput.setType(DataType.STDERR); - resultOutput.setValue(stderr); - outputArray.add(resultOutput); - } -// else if ("URI".equals(actual.getType().getType().toString())) { -// continue; -// } - else { - String parseStdout = parseStdout(stdout, paramName); - if (parseStdout != null) { - actual.setValue(parseStdout); - resultOutput.setName(paramName); - resultOutput.setType(DataType.STRING); - resultOutput.setValue(parseStdout); - outputArray.add(resultOutput); - } - } - } - } - - private static String parseStdout(String stdout, String outParam) throws Exception { - String regex = Pattern.quote(outParam) + regexPattern; - String match = null; - Pattern pattern = Pattern.compile(regex); - Matcher matcher = pattern.matcher(stdout); - while (matcher.find()) { - match = matcher.group(1); - } - if (match != null) { - match = match.trim(); - return match; - } - return null; - } - - public static String[] parseStdoutArray(String stdout, String outParam) throws Exception { - String regex = Pattern.quote(outParam) + regexPattern; - StringBuffer match = new StringBuffer(); - Pattern pattern = Pattern.compile(regex); - Matcher matcher = pattern.matcher(stdout); - while (matcher.find()) { - match.append(matcher.group(1) + StringUtil.DELIMETER); - } - if (match != null && match.length() >0) { - return StringUtil.getElementsFromString(match.toString()); - } - return null; - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-gsissh/pom.xml ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/pom.xml b/modules/gfac/gfac-gsissh/pom.xml deleted file mode 100644 index 81c3ec9..0000000 --- a/modules/gfac/gfac-gsissh/pom.xml +++ /dev/null @@ -1,117 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> - -<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file - distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under - the Apache License, Version 2.0 (theà "License"); you may not use this file except in compliance with the License. You may - obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to - in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF - ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under - the License. --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <groupId>org.apache.airavata</groupId> - <artifactId>gfac</artifactId> - <version>0.16-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <modelVersion>4.0.0</modelVersion> - <artifactId>airavata-gfac-gsissh</artifactId> - <name>Airavata GFac GSI-SSH implementation</name> - <description>This is the extension of </description> - <url>http://airavata.apache.org/</url> - - <dependencies> - - <dependency> - <groupId>org.apache.airavata</groupId> - <artifactId>airavata-gfac-email-monitor</artifactId> - <version>${project.version}</version> - </dependency> - <!-- Logging --> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - - <!-- GFAC dependencies --> - <dependency> - <groupId>org.apache.airavata</groupId> - <artifactId>airavata-gfac-core</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.airavata</groupId> - <artifactId>airavata-gfac-ssh</artifactId> - <version>${project.version}</version> - </dependency> - <!-- Credential Store --> - <dependency> - <groupId>org.apache.airavata</groupId> - <artifactId>airavata-credential-store</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.airavata</groupId> - <artifactId>airavata-server-configuration</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.airavata</groupId> - <artifactId>airavata-client-configuration</artifactId> - <scope>test</scope> - </dependency> - - - <!-- Test --> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.testng</groupId> - <artifactId>testng</artifactId> - <version>6.1.1</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>jcl-over-slf4j</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <scope>test</scope> - </dependency> - - <!-- gsi-ssh api dependencies --> - <dependency> - <groupId>org.apache.airavata</groupId> - <artifactId>gsissh</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.airavata</groupId> - <artifactId>airavata-data-models</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>com.jcraft</groupId> - <artifactId>jsch</artifactId> - <version>0.1.50</version> - </dependency> - <dependency> - <groupId>org.apache.xmlbeans</groupId> - <artifactId>xmlbeans</artifactId> - <version>${xmlbeans.version}</version> - </dependency> - <dependency> - <groupId>net.schmizz</groupId> - <artifactId>sshj</artifactId> - <version>0.6.1</version> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java deleted file mode 100644 index b4790c7..0000000 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.gsissh.handler; - -import org.apache.airavata.gfac.GFacException; -import org.apache.airavata.gfac.core.context.JobExecutionContext; -import org.apache.airavata.gfac.core.handler.AbstractHandler; -import org.apache.airavata.gfac.core.handler.GFacHandlerException; -import org.apache.airavata.gfac.core.utils.GFacUtils; -import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; -import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils; -import org.apache.airavata.gsi.ssh.api.Cluster; -import org.apache.airavata.model.workspace.experiment.*; -import org.apache.airavata.registry.cpi.ChildDataType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.Properties; - -public class GSISSHDirectorySetupHandler extends AbstractHandler { - private static final Logger log = LoggerFactory.getLogger(GSISSHDirectorySetupHandler.class); - - public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - try { - String hostAddress = jobExecutionContext.getHostName(); - if (jobExecutionContext.getSecurityContext(hostAddress) == null) { - GFACGSISSHUtils.addSecurityContext(jobExecutionContext); - } - } catch (Exception e) { - try { - StringWriter errors = new StringWriter(); - e.printStackTrace(new PrintWriter(errors)); - GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); - } catch (GFacException e1) { - log.error(e1.getLocalizedMessage()); - } - throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); - } - - log.info("Setup SSH job directorties"); - super.invoke(jobExecutionContext); - makeDirectory(jobExecutionContext); - } - private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - Cluster cluster = null; - try { - String hostAddress = jobExecutionContext.getHostName(); - cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster(); - if (cluster == null) { - try { - GFacUtils.saveErrorDetails(jobExecutionContext, "Security context is not set properly", CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); - } catch (GFacException e1) { - log.error(e1.getLocalizedMessage()); - } - throw new GFacHandlerException("Security context is not set properly"); - } else { - log.info("Successfully retrieved the Security Context"); - } - - String workingDirectory = jobExecutionContext.getWorkingDir(); - cluster.makeDirectory(workingDirectory); - if(!jobExecutionContext.getInputDir().equals(workingDirectory)) - cluster.makeDirectory(jobExecutionContext.getInputDir()); - if(!jobExecutionContext.getOutputDir().equals(workingDirectory)) - cluster.makeDirectory(jobExecutionContext.getOutputDir()); - - DataTransferDetails detail = new DataTransferDetails(); - TransferStatus status = new TransferStatus(); - status.setTransferState(TransferState.DIRECTORY_SETUP); - detail.setTransferStatus(status); - detail.setTransferDescription("Working directory = " + workingDirectory); - - registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); - - } catch (Exception e) { - DataTransferDetails detail = new DataTransferDetails(); - TransferStatus status = new TransferStatus(); - detail.setTransferDescription("Working directory = " + jobExecutionContext.getWorkingDir()); - status.setTransferState(TransferState.FAILED); - detail.setTransferStatus(status); - try { - registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); - GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE); - } catch (Exception e1) { - throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage()); - } - throw new GFacHandlerException("Error executing the Handler: " + GSISSHDirectorySetupHandler.class, e); - } - } - - public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - this.invoke(jobExecutionContext); - } - - public void initProperties(Properties properties) throws GFacHandlerException { - - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java deleted file mode 100644 index 3b36e86..0000000 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.gsissh.handler; - -import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.gfac.GFacException; -import org.apache.airavata.gfac.core.context.JobExecutionContext; -import org.apache.airavata.gfac.core.context.MessageContext; -import org.apache.airavata.gfac.core.handler.AbstractHandler; -import org.apache.airavata.gfac.core.handler.GFacHandlerException; -import org.apache.airavata.gfac.core.utils.GFacUtils; -import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; -import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils; -import org.apache.airavata.gsi.ssh.api.Cluster; -import org.apache.airavata.model.appcatalog.appinterface.DataType; -import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; -import org.apache.airavata.model.workspace.experiment.CorrectiveAction; -import org.apache.airavata.model.workspace.experiment.DataTransferDetails; -import org.apache.airavata.model.workspace.experiment.ErrorCategory; -import org.apache.airavata.model.workspace.experiment.TransferState; -import org.apache.airavata.model.workspace.experiment.TransferStatus; -import org.apache.airavata.registry.cpi.ChildDataType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.Set; - -/** - * Recoverability for this handler assumes the same input values will come in the second - * run, and assume nobody is changing registry during the original submission and re-submission - */ -public class GSISSHInputHandler extends AbstractHandler { - private static final Logger log = LoggerFactory.getLogger(GSISSHInputHandler.class); - - - public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - super.invoke(jobExecutionContext); - int index = 0; - int oldIndex = 0; - List<String> oldFiles = new ArrayList<String>(); - MessageContext inputNew = new MessageContext(); - DataTransferDetails detail = new DataTransferDetails(); - TransferStatus status = new TransferStatus(); - StringBuffer data = new StringBuffer("|"); - Cluster cluster = null; - - try { - String hostAddress = jobExecutionContext.getHostName(); - if (jobExecutionContext.getSecurityContext(hostAddress) == null) { - GFACGSISSHUtils.addSecurityContext(jobExecutionContext); - } - - cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster(); - if (cluster == null) { - throw new GFacException("Security context is not set properly"); - } else { - log.info("Successfully retrieved the Security Context"); - } - - String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName()); - if (pluginData != null) { - try { - oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim()); - oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(",")); - if (oldIndex == oldFiles.size()) { - log.info("Old data looks good !!!!"); - } else { - oldIndex = 0; - oldFiles.clear(); - } - } catch (NumberFormatException e) { - log.error("Previously stored data " + pluginData + " is wrong so we continue the operations"); - } - } - if (jobExecutionContext.getSecurityContext(hostAddress) == null) { - try { - GFACGSISSHUtils.addSecurityContext(jobExecutionContext); - } catch (ApplicationSettingsException e) { - log.error(e.getMessage()); - try { - GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); - } catch (GFacException e1) { - log.error(e1.getLocalizedMessage()); - } - throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); - } - } - log.info("Invoking SCPInputHandler"); - - MessageContext input = jobExecutionContext.getInMessageContext(); - Set<String> parameters = input.getParameters().keySet(); - for (String paramName : parameters) { - InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName); - String paramValue = inputParamType.getValue(); - //TODO: Review this with type - if (inputParamType.getType() == DataType.URI) { - if (index < oldIndex) { - log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); - inputParamType.setValue(oldFiles.get(index)); - data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index - } else { - String stageInputFile = stageInputFiles(cluster, jobExecutionContext, paramValue); - inputParamType.setValue(stageInputFile); - StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString()); - status.setTransferState(TransferState.UPLOAD); - detail.setTransferStatus(status); - detail.setTransferDescription("Input Data Staged: " + stageInputFile); - registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); - - GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); - } - } // FIXME: what is the thrift model DataType equivalent for URIArray type? -// else if ("URIArray".equals(inputParamType.getType().getType().toString())) { -// List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue)); -// List<String> newFiles = new ArrayList<String>(); -// for (String paramValueEach : split) { -// if (index < oldIndex) { -// log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); -// newFiles.add(oldFiles.get(index)); -// data.append(oldFiles.get(index++)).append(","); -// } else { -// String stageInputFiles = stageInputFiles(cluster, jobExecutionContext, paramValueEach); -// status.setTransferState(TransferState.UPLOAD); -// detail.setTransferStatus(status); -// detail.setTransferDescription("Input Data Staged: " + stageInputFiles); -// registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); -// StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString()); -// GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); -// newFiles.add(stageInputFiles); -// } -// -// } -// ((URIArrayType) inputParamType.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()])); -// } - inputNew.getParameters().put(paramName, inputParamType); - } - } catch (Exception e) { - log.error(e.getMessage()); - status.setTransferState(TransferState.FAILED); - detail.setTransferDescription(e.getLocalizedMessage()); - detail.setTransferStatus(status); - try { - GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE); - registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); - } catch (Exception e1) { - throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage()); - } - throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage()); - } - jobExecutionContext.setInMessageContext(inputNew); - } - - private static String stageInputFiles(Cluster cluster, JobExecutionContext jobExecutionContext, String paramValue) throws IOException, GFacException { - int i = paramValue.lastIndexOf(File.separator); - String substring = paramValue.substring(i + 1); - try { - String targetFile = jobExecutionContext.getInputDir() + File.separator + substring; - if (paramValue.startsWith("file")) { - paramValue = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length()); - } - boolean success = false; - int j = 1; - while(!success){ - try { - cluster.scpTo(targetFile, paramValue); - success = true; - } catch (Exception e) { - log.info(e.getLocalizedMessage()); - Thread.sleep(2000); - if(j==3) { - throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage()); - } - } - j++; - } - return targetFile; - } catch (Exception e) { - throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage()); - } - } - - public void initProperties(Properties properties) throws GFacHandlerException { - - } - - public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - this.invoke(jobExecutionContext); - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java deleted file mode 100644 index 18dcb97..0000000 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java +++ /dev/null @@ -1,323 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.gsissh.handler; - -//import org.apache.airavata.commons.gfac.type.ActualParameter; -//import org.apache.airavata.commons.gfac.type.MappingFactory; -import org.apache.airavata.gfac.GFacException; -import org.apache.airavata.gfac.core.context.JobExecutionContext; -import org.apache.airavata.gfac.core.handler.AbstractHandler; -import org.apache.airavata.gfac.core.handler.GFacHandlerException; -import org.apache.airavata.gfac.core.provider.GFacProviderException; -import org.apache.airavata.gfac.core.utils.GFacUtils; -import org.apache.airavata.gfac.core.utils.OutputUtils; -import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; -import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils; -import org.apache.airavata.gsi.ssh.api.Cluster; -import org.apache.airavata.model.appcatalog.appinterface.DataType; -import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; -import org.apache.airavata.model.workspace.experiment.CorrectiveAction; -import org.apache.airavata.model.workspace.experiment.DataTransferDetails; -import org.apache.airavata.model.workspace.experiment.ErrorCategory; -import org.apache.airavata.model.workspace.experiment.TaskDetails; -import org.apache.airavata.model.workspace.experiment.TransferState; -import org.apache.airavata.model.workspace.experiment.TransferStatus; -import org.apache.airavata.registry.cpi.ChildDataType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -public class GSISSHOutputHandler extends AbstractHandler { - private static final Logger log = LoggerFactory.getLogger(GSISSHOutputHandler.class); - - public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - super.invoke(jobExecutionContext); - int index = 0; - int oldIndex = 0; - List<String> oldFiles = new ArrayList<String>(); - StringBuffer data = new StringBuffer("|"); - String hostAddress = jobExecutionContext.getHostName(); - try { - if (jobExecutionContext.getSecurityContext(hostAddress) == null) { - GFACGSISSHUtils.addSecurityContext(jobExecutionContext); - } - } catch (Exception e) { - try { - GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); - } catch (GFacException e1) { - log.error(e1.getLocalizedMessage()); - } - log.error(e.getMessage()); - throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); - } - DataTransferDetails detail = new DataTransferDetails(); - TransferStatus status = new TransferStatus(); - - Cluster cluster = null; - - try { - cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster(); - if (cluster == null) { - GFacUtils.saveErrorDetails(jobExecutionContext, "Security context is not set properly", CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE); - - throw new GFacProviderException("Security context is not set properly"); - } else { - log.info("Successfully retrieved the Security Context"); - } - - // Get the Stdouts and StdErrs - String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName()); - if (pluginData != null) { - try { - oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim()); - oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(",")); - if (oldIndex == oldFiles.size()) { - log.info("Old data looks good !!!!"); - } else { - oldIndex = 0; - oldFiles.clear(); - } - } catch (NumberFormatException e) { - log.error("Previously stored data " + pluginData + " is wrong so we continue the operations"); - } - } - - String timeStampedExperimentID = GFacUtils.createUniqueNameWithDate(jobExecutionContext.getExperimentID()); - - TaskDetails taskData = jobExecutionContext.getTaskData(); - String outputDataDir = null; - File localStdOutFile; - File localStdErrFile; - //FIXME: AdvancedOutput is remote location and third party transfer should work to make this work -// if (taskData.getAdvancedOutputDataHandling() != null) { -// outputDataDir = taskData.getAdvancedOutputDataHandling().getOutputDataDir(); -// } - if (outputDataDir == null) { - outputDataDir = File.separator + "tmp"; - } - outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID(); - (new File(outputDataDir)).mkdirs(); - - String stdOutStr = ""; - if (index < oldIndex) { - localStdOutFile = new File(oldFiles.get(index)); - data.append(oldFiles.get(index++)).append(","); - } else { - int i = 0; - localStdOutFile = new File(outputDataDir + File.separator + jobExecutionContext.getApplicationName() + ".stdout"); - while(stdOutStr.isEmpty()){ - try { - cluster.scpFrom(jobExecutionContext.getStandardOutput(), localStdOutFile.getAbsolutePath()); - stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath()); - } catch (Exception e) { - log.error(e.getLocalizedMessage()); - Thread.sleep(2000); - } - i++; - if(i==3)break; - } - - StringBuffer temp = new StringBuffer(data.append(localStdOutFile.getAbsolutePath()).append(",").toString()); - GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); - } - if (index < oldIndex) { - localStdErrFile = new File(oldFiles.get(index)); - data.append(oldFiles.get(index++)).append(","); - } else { - localStdErrFile = new File(outputDataDir + File.separator + jobExecutionContext.getApplicationName() + ".stderr"); - cluster.scpFrom(jobExecutionContext.getStandardError(), localStdErrFile.getAbsolutePath()); - StringBuffer temp = new StringBuffer(data.append(localStdErrFile.getAbsolutePath()).append(",").toString()); - GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); - } - - String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath()); - status.setTransferState(TransferState.STDOUT_DOWNLOAD); - detail.setTransferStatus(status); - detail.setTransferDescription("STDOUT:" + localStdOutFile.getAbsolutePath()); - registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); - - status.setTransferState(TransferState.STDERROR_DOWNLOAD); - detail.setTransferStatus(status); - detail.setTransferDescription("STDERR:" + localStdErrFile.getAbsolutePath()); - registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); - - //todo this is a mess we have to fix this - List<OutputDataObjectType> outputArray = new ArrayList<OutputDataObjectType>(); - Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters(); - Set<String> keys = output.keySet(); - for (String paramName : keys) { - OutputDataObjectType outputDataObjectType = (OutputDataObjectType) output.get(paramName); - if (DataType.URI == outputDataObjectType.getType()) { - - List<String> outputList = null; - int retry=3; - while(retry>0){ - outputList = cluster.listDirectory(jobExecutionContext.getOutputDir()); - if (outputList.size() == 1 && outputList.get(0).isEmpty()) { - Thread.sleep(10000); - } else if (outputList.size() > 0) { - break; - }else{ - Thread.sleep(10000); - } - retry--; - if(retry==0){ - } - Thread.sleep(10000); - } - if (outputList.size() == 0 || outputList.get(0).isEmpty() || outputList.size() > 1) { - OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray); - Set<String> strings = output.keySet(); - outputArray.clear(); - for (String key : strings) { - OutputDataObjectType outputDataObjectType1 = (OutputDataObjectType) output.get(key); - if (DataType.URI == outputDataObjectType1.getType()) { - String downloadFile = outputDataObjectType1.getValue(); - String localFile; - if (index < oldIndex) { - localFile = oldFiles.get(index); - data.append(oldFiles.get(index++)).append(","); - } else { - cluster.scpFrom(downloadFile, outputDataDir); - String fileName = downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar) + 1, downloadFile.length()); - localFile = outputDataDir + File.separator + fileName; - StringBuffer temp = new StringBuffer(data.append(localFile).append(",").toString()); - GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); - } - jobExecutionContext.addOutputFile(localFile); - outputDataObjectType1.setValue(localFile); - OutputDataObjectType dataObjectType = new OutputDataObjectType(); - dataObjectType.setValue(localFile); - dataObjectType.setName(key); - dataObjectType.setType(DataType.URI); - outputArray.add(dataObjectType); - }else if (DataType.STDOUT == outputDataObjectType1.getType()) { - String localFile; - if (index < oldIndex) { - localFile = oldFiles.get(index); - data.append(oldFiles.get(index++)).append(","); - } else { - String fileName = localStdOutFile.getName(); - localFile = outputDataDir + File.separator + fileName; - StringBuffer temp = new StringBuffer(data.append(localFile).append(",").toString()); - GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); - } - jobExecutionContext.addOutputFile(localFile); - outputDataObjectType1.setValue(localFile); - OutputDataObjectType dataObjectType = new OutputDataObjectType(); - dataObjectType.setValue(localFile); - dataObjectType.setName(key); - dataObjectType.setType(DataType.STDOUT); - outputArray.add(dataObjectType); - }else if (DataType.STDERR == outputDataObjectType1.getType()) { - String localFile; - if (index < oldIndex) { - localFile = oldFiles.get(index); - data.append(oldFiles.get(index++)).append(","); - } else { - String fileName = localStdErrFile.getName(); - localFile = outputDataDir + File.separator + fileName; - StringBuffer temp = new StringBuffer(data.append(localFile).append(",").toString()); - GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); - } - jobExecutionContext.addOutputFile(localFile); - outputDataObjectType1.setValue(localFile); - OutputDataObjectType dataObjectType = new OutputDataObjectType(); - dataObjectType.setValue(localFile); - dataObjectType.setName(key); - dataObjectType.setType(DataType.STDERR); - outputArray.add(dataObjectType); - } - } - break; - } else if(outputList.size() == 1) { //FIXME: this is ultrascan specific - String valueList = outputList.get(0); - String outputFile; - if (index < oldIndex) { - outputFile = oldFiles.get(index); - data.append(oldFiles.get(index++)).append(","); - } else { - cluster.scpFrom(jobExecutionContext.getOutputDir() + File.separator + valueList, outputDataDir); - outputFile = outputDataDir + File.separator + valueList; - jobExecutionContext.addOutputFile(outputFile); - StringBuffer temp = new StringBuffer(data.append(outputFile).append(",").toString()); - GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); - } - jobExecutionContext.addOutputFile(outputFile); - outputDataObjectType.setValue(outputFile); - OutputDataObjectType dataObjectType = new OutputDataObjectType(); - dataObjectType.setValue(valueList); - dataObjectType.setName(paramName); - dataObjectType.setType(DataType.URI); - outputArray.add(dataObjectType); - } - } else { - OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray); -// break; - } - } - if (outputArray == null || outputArray.isEmpty()) { - if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() == null){ - throw new GFacHandlerException( - "Empty Output returned from the Application, Double check the application" - + "and ApplicationDescriptor output Parameter Names" - ); - } - } - // Why we set following? - jobExecutionContext.setStandardError(localStdErrFile.getAbsolutePath()); - jobExecutionContext.setStandardOutput(localStdOutFile.getAbsolutePath()); - jobExecutionContext.setOutputDir(outputDataDir); - status.setTransferState(TransferState.DOWNLOAD); - detail.setTransferStatus(status); - detail.setTransferDescription(outputDataDir); - registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); - registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID()); - fireTaskOutputChangeEvent(jobExecutionContext, outputArray); - } catch (Exception e) { - try { - status.setTransferState(TransferState.FAILED); - detail.setTransferStatus(status); - detail.setTransferDescription(e.getLocalizedMessage()); - registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); - GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE); - } catch (Exception e1) { - throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage()); - } - throw new GFacHandlerException("Error in retrieving results", e); - } - } - - public void initProperties(Properties properties) throws GFacHandlerException { - - } - - public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - this.invoke(jobExecutionContext); - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/NewGSISSHOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/NewGSISSHOutputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/NewGSISSHOutputHandler.java deleted file mode 100644 index ed94312..0000000 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/NewGSISSHOutputHandler.java +++ /dev/null @@ -1,83 +0,0 @@ -package org.apache.airavata.gfac.gsissh.handler; - -import java.util.List; -import java.util.Properties; - -import org.apache.airavata.gfac.GFacException; -import org.apache.airavata.gfac.core.context.JobExecutionContext; -import org.apache.airavata.gfac.core.handler.AbstractHandler; -import org.apache.airavata.gfac.core.handler.GFacHandlerException; -import org.apache.airavata.gfac.core.provider.GFacProviderException; -import org.apache.airavata.gfac.core.utils.GFacUtils; -import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; -import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils; -import org.apache.airavata.gfac.ssh.util.HandleOutputs; -import org.apache.airavata.gsi.ssh.api.Cluster; -import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; -import org.apache.airavata.model.workspace.experiment.CorrectiveAction; -import org.apache.airavata.model.workspace.experiment.ErrorCategory; -import org.apache.airavata.registry.cpi.ChildDataType; -import org.apache.airavata.registry.cpi.RegistryException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class NewGSISSHOutputHandler extends AbstractHandler{ - private static final Logger log = LoggerFactory.getLogger(NewGSISSHOutputHandler.class); - public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - super.invoke(jobExecutionContext); - String hostAddress = jobExecutionContext.getHostName(); - try { - if (jobExecutionContext.getSecurityContext(hostAddress) == null) { - GFACGSISSHUtils.addSecurityContext(jobExecutionContext); - } - } catch (Exception e) { - try { - GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); - } catch (GFacException e1) { - log.error(e1.getLocalizedMessage()); - } - log.error(e.getMessage()); - throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); - } - Cluster cluster = null; - - try { - cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster(); - if (cluster == null) { - GFacUtils.saveErrorDetails(jobExecutionContext, "Security context is not set properly", CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE); - - throw new GFacProviderException("Security context is not set properly"); - } else { - log.info("Successfully retrieved the Security Context"); - } - } catch (Exception e) { - log.error(e.getMessage()); - try { - GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); - } catch (GFacException e1) { - log.error(e1.getLocalizedMessage()); - } - throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); - } - - super.invoke(jobExecutionContext); - List<OutputDataObjectType> outputArray = HandleOutputs.handleOutputs(jobExecutionContext, cluster); - try { - registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID()); - } catch (RegistryException e) { - throw new GFacHandlerException(e); - } - } - - @Override - public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - // TODO: Auto generated method body. - } - - @Override - public void initProperties(Properties properties) throws GFacHandlerException { - // TODO Auto-generated method stub - - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java deleted file mode 100644 index 36aac4c..0000000 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java +++ /dev/null @@ -1,351 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.gsissh.provider.impl; - -import org.airavata.appcatalog.cpi.AppCatalogException; -import org.apache.airavata.common.exception.AiravataException; -import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.gfac.ExecutionMode; -import org.apache.airavata.gfac.GFacException; -import org.apache.airavata.gfac.core.context.JobExecutionContext; -import org.apache.airavata.gfac.core.cpi.BetterGfacImpl; -import org.apache.airavata.gfac.core.handler.GFacHandlerException; -import org.apache.airavata.gfac.core.handler.ThreadedHandler; -import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent; -import org.apache.airavata.gfac.core.provider.AbstractProvider; -import org.apache.airavata.gfac.core.provider.GFacProviderException; -import org.apache.airavata.gfac.core.utils.GFacUtils; -import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; -import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils; -import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor; -import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory; -import org.apache.airavata.gsi.ssh.api.Cluster; -import org.apache.airavata.gsi.ssh.api.SSHApiException; -import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; -import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; -import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; -import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; -import org.apache.airavata.model.appcatalog.computeresource.MonitorMode; -import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; -import org.apache.airavata.model.workspace.experiment.CorrectiveAction; -import org.apache.airavata.model.workspace.experiment.ErrorCategory; -import org.apache.airavata.model.workspace.experiment.JobDetails; -import org.apache.airavata.model.workspace.experiment.JobState; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.List; -import java.util.Map; - -//import org.apache.airavata.schemas.gfac.GsisshHostType; - -public class GSISSHProvider extends AbstractProvider { - private static final Logger log = LoggerFactory.getLogger(GSISSHProvider.class); - - public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException { - - } - - public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { - super.initialize(jobExecutionContext); - try { - String hostAddress = jobExecutionContext.getHostName(); - if (jobExecutionContext.getSecurityContext(hostAddress) == null) { - GFACGSISSHUtils.addSecurityContext(jobExecutionContext); - } - } catch (ApplicationSettingsException e) { - log.error(e.getMessage()); - throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); - } catch (GFacException e) { - throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); - } - } - - public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { - log.info("Invoking GSISSH Provider Invoke ..."); - StringBuffer data = new StringBuffer(); - jobExecutionContext.getNotifier().publish(new StartExecutionEvent()); - ComputeResourceDescription computeResourceDescription = jobExecutionContext.getApplicationContext() - .getComputeResourceDescription(); - ApplicationDeploymentDescription appDeployDesc = jobExecutionContext.getApplicationContext() - .getApplicationDeploymentDescription(); - JobDetails jobDetails = new JobDetails(); - Cluster cluster = null; - - try { - if (jobExecutionContext.getSecurityContext(jobExecutionContext.getHostName()) != null) { - cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(jobExecutionContext.getHostName())).getPbsCluster(); - } - if (cluster == null) { - throw new GFacProviderException("Security context is not set properly"); - } else { - log.info("Successfully retrieved the Security Context"); - } - // This installed path is a mandetory field, because this could change based on the computing resource - JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, cluster); - jobDetails.setJobName(jobDescriptor.getJobName()); - - log.info(jobDescriptor.toXML()); - data.append("jobDesc=").append(jobDescriptor.toXML()); - jobDetails.setJobDescription(jobDescriptor.toXML()); - String jobID = cluster.submitBatchJob(jobDescriptor); - jobExecutionContext.setJobDetails(jobDetails); - if (jobID == null) { - jobDetails.setJobID("none"); - GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); - } else { - jobDetails.setJobID(jobID.split("\\.")[0]); - GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED); - } - data.append(",jobId=").append(jobDetails.getJobID()); - - // Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler - // to perform monitoring, daemon handlers can be accessed from anywhere - monitor(jobExecutionContext); - // we know this host is type GsiSSHHostType - } catch (Exception e) { - String error = "Error submitting the job to host " + computeResourceDescription.getHostName() + " message: " + e.getMessage(); - log.error(error); - jobDetails.setJobID("none"); - GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); - StringWriter errors = new StringWriter(); - e.printStackTrace(new PrintWriter(errors)); - GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); - throw new GFacProviderException(error, e); - } finally { - log.info("Saving data for future recovery: "); - log.info(data.toString()); - GFacUtils.saveHandlerData(jobExecutionContext, data, this.getClass().getName()); - } - - } - - public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException { -/* List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers(); - if (daemonHandlers == null) { - daemonHandlers = BetterGfacImpl.getDaemonHandlers(); - } - ThreadedHandler pullMonitorHandler = null; - ThreadedHandler pushMonitorHandler = null; - MonitorMode monitorMode = sshJobSubmission.getMonitorMode(); - for (ThreadedHandler threadedHandler : daemonHandlers) { - if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) { - pullMonitorHandler = threadedHandler; - if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) { - jobExecutionContext.setProperty("cancel","true"); - pullMonitorHandler.invoke(jobExecutionContext); - } else { - log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" + - " to handle by the GridPullMonitorHandler"); - } - } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) { - pushMonitorHandler = threadedHandler; - if ( monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) { - pushMonitorHandler.invoke(jobExecutionContext); - } else { - log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" + - " to handle by the GridPushMonitorHandler"); - } - } - // have to handle the GridPushMonitorHandler logic - } - if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) { - log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" + - ", execution is configured as asynchronous, so Outhandler will not be invoked"); - }*/ - } - - public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException { - //To change body of implemented methods use File | Settings | File Templates. - log.info("canceling the job status in GSISSHProvider!!!!!"); - JobDetails jobDetails = jobExecutionContext.getJobDetails(); - String hostName = jobExecutionContext.getHostName(); - try { - Cluster cluster = null; - if (jobExecutionContext.getSecurityContext(hostName) == null) { - GFACGSISSHUtils.addSecurityContext(jobExecutionContext); - } - cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(hostName)).getPbsCluster(); - if (cluster == null) { - throw new GFacProviderException("Security context is not set properly"); - } else { - log.info("Successfully retrieved the Security Context"); - } - // This installed path is a mandetory field, because this could change based on the computing resource - if(jobDetails == null) { - log.error("There is not JobDetails so cancelations cannot perform !!!"); - return false; - } - if (jobDetails.getJobID() != null) { - // if this operation success without any exceptions, we can assume cancel operation succeeded. - cluster.cancelJob(jobDetails.getJobID()); - } else { - log.error("No Job Id is set, so cannot perform the cancel operation !!!"); - return false; - } - GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED); - return true; - // we know this host is type GsiSSHHostType - } catch (SSHApiException e) { - String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage(); - log.error(error); - jobDetails.setJobID("none"); - GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); - GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); - throw new GFacProviderException(error, e); - } catch (Exception e) { - String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage(); - log.error(error); - jobDetails.setJobID("none"); - GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); - GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); - throw new GFacProviderException(error, e); - } - } - - public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException { - // have to implement the logic to recover a gfac failure - log.info("Invoking Recovering for the Experiment: " + jobExecutionContext.getExperimentID()); - ComputeResourceDescription computeResourceDescription = jobExecutionContext.getApplicationContext() - .getComputeResourceDescription(); - String hostName = jobExecutionContext.getHostName(); - String jobId = ""; - String jobDesc = ""; - try { - String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName()); - String[] split = pluginData.split(","); - if (split.length < 2) { - try { - this.execute(jobExecutionContext); - } catch (GFacException e) { - log.error("Error while recovering provider", e); - throw new GFacProviderException("Error recovering provider", e); - } - return; - } - jobDesc = split[0].substring(7); - jobId = split[1].substring(6); - - log.info("Following data have recovered: "); - log.info("Job Description: " + jobDesc); - log.info("Job Id: " + jobId); - if (jobId == null || "none".equals(jobId) || - "".equals(jobId)) { - try { - this.execute(jobExecutionContext); - } catch (GFacException e) { - log.error("Error while recovering provider", e); - throw new GFacProviderException("Error recovering provider", e); - } - return; - } - } catch (Exception e) { - log.error("Error while recovering provider", e); - } - try { - // Now we are we have enough data to recover - JobDetails jobDetails = new JobDetails(); - jobDetails.setJobDescription(jobDesc); - jobDetails.setJobID(jobId); - jobExecutionContext.setJobDetails(jobDetails); - if (jobExecutionContext.getSecurityContext(hostName) == null) { - try { - GFACGSISSHUtils.addSecurityContext(jobExecutionContext); - } catch (ApplicationSettingsException e) { - log.error(e.getMessage()); - throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); - } - } - monitor(jobExecutionContext); - } catch (Exception e) { - log.error("Error while recover the job", e); - throw new GFacProviderException("Error delegating already ran job to Monitoring", e); - } - } - - @Override - public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { - String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId(); - SSHJobSubmission sshJobSubmission = null; - try { - sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId); - } catch (AppCatalogException e) { - throw new GFacException("Error while reading compute resource", e); - } - if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) { - MonitorMode monitorMode = sshJobSubmission.getMonitorMode(); - if (monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) { - try { - EmailBasedMonitor emailBasedMonitor = EmailMonitorFactory.getEmailBasedMonitor( - sshJobSubmission.getResourceJobManager().getResourceJobManagerType()); - emailBasedMonitor.addToJobMonitorMap(jobExecutionContext); - } catch (AiravataException e) { - throw new GFacHandlerException("Error while activating email job monitoring ", e); - } - return; - } - } -/* - // if email monitor is not activeated or not configure we use pull or push monitor - List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers(); - if (daemonHandlers == null) { - daemonHandlers = BetterGfacImpl.getDaemonHandlers(); - } - ThreadedHandler pullMonitorHandler = null; - ThreadedHandler pushMonitorHandler = null; - MonitorMode monitorMode = sshJobSubmission.getMonitorMode(); - String jobID = jobExecutionContext.getJobDetails().getJobID(); - for (ThreadedHandler threadedHandler : daemonHandlers) { - if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) { - pullMonitorHandler = threadedHandler; - if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) { - log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID); - pullMonitorHandler.invoke(jobExecutionContext); - } else { - log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" + - " to handle by the GridPullMonitorHandler"); - } - } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) { - pushMonitorHandler = threadedHandler; - if (monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) { - log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID); - pushMonitorHandler.invoke(jobExecutionContext); - } else { - log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" + - " to handle by the GridPushMonitorHandler"); - } - } - // have to handle the GridPushMonitorHandler logic - } - if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) { - log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" + - ", execution is configured as asynchronous, so Outhandler will not be invoked"); - - }*/ - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java deleted file mode 100644 index 46e7acd..0000000 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.gsissh.security; - -import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.credential.store.credential.Credential; -import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential; -import org.apache.airavata.credential.store.store.CredentialReader; -import org.apache.airavata.gfac.AbstractSecurityContext; -import org.apache.airavata.gfac.Constants; -import org.apache.airavata.gfac.GFacException; -import org.apache.airavata.gfac.RequestData; -import org.apache.airavata.gsi.ssh.api.Cluster; -import org.globus.gsi.X509Credential; -import org.globus.gsi.gssapi.GlobusGSSCredentialImpl; -import org.globus.gsi.provider.GlobusProvider; -import org.globus.myproxy.GetParams; -import org.globus.myproxy.MyProxy; -import org.globus.myproxy.MyProxyException; -import org.gridforum.jgss.ExtendedGSSCredential; -import org.ietf.jgss.GSSCredential; -import org.ietf.jgss.GSSException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.security.Security; -import java.security.cert.X509Certificate; - -/** - * Handles GRID related security. - */ -public class GSISecurityContext extends AbstractSecurityContext { - - protected static final Logger log = LoggerFactory.getLogger(GSISecurityContext.class); - /* - * context name - */ - - private Cluster pbsCluster = null; - - - public GSISecurityContext(CredentialReader credentialReader, RequestData requestData, Cluster pbsCluster) { - super(credentialReader, requestData); - this.pbsCluster = pbsCluster; - } - - - public GSISecurityContext(CredentialReader credentialReader, RequestData requestData) { - super(credentialReader, requestData); - } - - - public GSISecurityContext(Cluster pbsCluster) { - this.setPbsCluster(pbsCluster); - } - - - - public Cluster getPbsCluster() { - return pbsCluster; - } - - public void setPbsCluster(Cluster pbsCluster) { - this.pbsCluster = pbsCluster; - } -}
