update outputs to database
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/dad1204a Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/dad1204a Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/dad1204a Branch: refs/heads/master Commit: dad1204a779f185a533d7f6e9f6eb4d82c60ee19 Parents: 1ea272a Author: Chathuri Wimalasena <[email protected]> Authored: Thu Sep 3 15:07:29 2015 -0400 Committer: Supun Nakandala <[email protected]> Committed: Sat Sep 5 12:24:21 2015 +0530 ---------------------------------------------------------------------- .../apache/airavata/gfac/core/GFacUtils.java | 84 ++++++++++++++++++++ .../airavata/gfac/core/context/TaskContext.java | 20 +++++ .../airavata/gfac/impl/GFacEngineImpl.java | 2 + .../impl/task/AdvancedSCPDataStageTask.java | 23 +++++- 4 files changed, 126 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/dad1204a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java index 784d214..3ee0461 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java @@ -1134,6 +1134,90 @@ public class GFacUtils { } } + public static void saveExperimentInput(ProcessContext processContext, String inputName, String inputVal) throws GFacException { + try { + ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog(); + String experimentId = processContext.getExperimentId(); + ExperimentModel experiment = (ExperimentModel)experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId); + List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs(); + if (experimentInputs != null && !experimentInputs.isEmpty()){ + for (InputDataObjectType expInput : experimentInputs){ + if (expInput.getName().equals(inputName)){ + expInput.setValue(inputVal); + } + } + } + experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment, experimentId); + } catch (RegistryException e) { + String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId() + + " : - Error while updating experiment inputs"; + throw new GFacException(msg, e); + } + } + + public static void saveProcessInput(ProcessContext processContext, String inputName, String inputVal) throws GFacException { + try { + ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog(); + String processId = processContext.getProcessId(); + ProcessModel processModel = (ProcessModel)experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId); + List<InputDataObjectType> processInputs = processModel.getProcessInputs(); + if (processInputs != null && !processInputs.isEmpty()){ + for (InputDataObjectType processInput : processInputs){ + if (processInput.getName().equals(inputName)){ + processInput.setValue(inputVal); + } + } + } + experimentCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processId); + } catch (RegistryException e) { + String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId() + + " : - Error while updating experiment inputs"; + throw new GFacException(msg, e); + } + } + + public static void saveExperimentOutput(ProcessContext processContext, String outputName, String outputVal) throws GFacException { + try { + ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog(); + String experimentId = processContext.getExperimentId(); + ExperimentModel experiment = (ExperimentModel)experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId); + List<OutputDataObjectType> experimentOutputs = experiment.getExperimentOutputs(); + if (experimentOutputs != null && !experimentOutputs.isEmpty()){ + for (OutputDataObjectType expOutput : experimentOutputs){ + if (expOutput.getName().equals(outputName)){ + expOutput.setValue(outputVal); + } + } + } + experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment, experimentId); + } catch (RegistryException e) { + String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId() + + " : - Error while updating experiment outputs"; + throw new GFacException(msg, e); + } + } + + public static void saveProcessOutput(ProcessContext processContext, String outputName, String outputVal) throws GFacException { + try { + ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog(); + String processId = processContext.getProcessId(); + ProcessModel processModel = (ProcessModel)experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId); + List<OutputDataObjectType> processOutputs = processModel.getProcessOutputs(); + if (processOutputs != null && !processOutputs.isEmpty()){ + for (OutputDataObjectType processOutput : processOutputs){ + if (processOutput.getName().equals(outputName)){ + processOutput.setValue(outputVal); + } + } + } + experimentCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processId); + } catch (RegistryException e) { + String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId() + + " : - Error while updating experiment outputs"; + throw new GFacException(msg, e); + } + } + public static void handleProcessInterrupt(ProcessContext processContext) throws GFacException { if (processContext.isCancel()) { ProcessStatus pStatus = new ProcessStatus(ProcessState.CANCELLING); http://git-wip-us.apache.org/repos/asf/airavata/blob/dad1204a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java index 597fd2e..2ca6c6b 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java @@ -20,6 +20,8 @@ */ package org.apache.airavata.gfac.core.context; +import org.apache.airavata.model.application.io.InputDataObjectType; +import org.apache.airavata.model.application.io.OutputDataObjectType; import org.apache.airavata.model.status.TaskState; import org.apache.airavata.model.status.TaskStatus; import org.apache.airavata.model.task.TaskModel; @@ -32,6 +34,8 @@ public class TaskContext { private TaskModel taskModel; private ProcessContext parentProcessContext; + private InputDataObjectType processInput; + private OutputDataObjectType processOutput; public TaskModel getTaskModel() { return taskModel; @@ -79,4 +83,20 @@ public class TaskContext { public String getLocalWorkingDir() { return getParentProcessContext().getLocalWorkingDir(); } + + public InputDataObjectType getProcessInput() { + return processInput; + } + + public void setProcessInput(InputDataObjectType processInput) { + this.processInput = processInput; + } + + public OutputDataObjectType getProcessOutput() { + return processOutput; + } + + public void setProcessOutput(OutputDataObjectType processOutput) { + this.processOutput = processOutput; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/dad1204a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java index 39dffb9..b0bdb65 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java @@ -438,6 +438,7 @@ public class GFacEngineImpl implements GFacEngine { ()); taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel)); taskCtx.setTaskModel(taskModel); + taskCtx.setProcessInput(processInput); return taskCtx; } @@ -462,6 +463,7 @@ public class GFacEngineImpl implements GFacEngine { submodel.setDestination("file://" + localWorkingDir); taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel)); taskCtx.setTaskModel(taskModel); + taskCtx.setProcessOutput(processOutput); return taskCtx; } http://git-wip-us.apache.org/repos/asf/airavata/blob/dad1204a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java index 2a0c54c..34a23df 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java @@ -30,6 +30,7 @@ import org.apache.airavata.credential.store.credential.Credential; import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential; import org.apache.airavata.credential.store.store.CredentialReader; import org.apache.airavata.credential.store.store.CredentialStoreException; +import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.GFacUtils; import org.apache.airavata.gfac.core.SSHApiException; import org.apache.airavata.gfac.core.authentication.AuthenticationInfo; @@ -41,12 +42,14 @@ import org.apache.airavata.gfac.core.task.Task; import org.apache.airavata.gfac.core.task.TaskException; import org.apache.airavata.gfac.impl.Factory; import org.apache.airavata.gfac.impl.SSHUtils; +import org.apache.airavata.model.application.io.OutputDataObjectType; import org.apache.airavata.model.commons.ErrorModel; import org.apache.airavata.model.status.ProcessState; import org.apache.airavata.model.status.TaskState; import org.apache.airavata.model.status.TaskStatus; import org.apache.airavata.model.task.DataStagingTaskModel; import org.apache.airavata.model.task.TaskTypes; +import org.apache.airavata.registry.cpi.ExperimentCatalog; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +60,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.List; import java.util.Map; public class AdvancedSCPDataStageTask implements Task{ @@ -201,8 +205,17 @@ public class AdvancedSCPDataStageTask implements Task{ errorModel.setActualErrorMessage(e.getMessage()); errorModel.setUserFriendlyMessage(msg); taskContext.getTaskModel().setTaskError(errorModel); + } catch (GFacException e) { + String msg = "Failed update experiment and process inputs and outputs"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); } - return status; + return status; } private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI @@ -219,7 +232,7 @@ public class AdvancedSCPDataStageTask implements Task{ } private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI, - String filePath) throws SSHApiException, AiravataException, IOException, JSchException { + String filePath) throws SSHApiException, AiravataException, IOException, JSchException, GFacException { /** * scp remote file from comute resource to airavata local */ @@ -229,7 +242,11 @@ public class AdvancedSCPDataStageTask implements Task{ * scp local file to remote client */ SSHUtils.scpTo(filePath, destinationURI.getPath(), sshSession); - } + // update output locations + GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath()); + GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath()); + + } private File getLocalDataDir(TaskContext taskContext) { String outputPath = ServerSettings.getLocalDataLocation();
