This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/helix-integration by this push:
new bc0016f Adding output file details to experiment output
bc0016f is described below
commit bc0016f65dfb0146c92bbd76cc25cb93650748ea
Author: dimuthu <[email protected]>
AuthorDate: Thu Mar 8 17:11:29 2018 -0500
Adding output file details to experiment output
---
.../airavata/helix/impl/task/AiravataTask.java | 74 +++++++++++++++++++---
.../impl/task/staging/OutputDataStagingTask.java | 9 ++-
.../helix/impl/workflow/PostWorkflowManager.java | 1 +
.../apache/airavata/helix/core/util/TaskUtil.java | 9 ++-
4 files changed, 79 insertions(+), 14 deletions(-)
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index 3ad8632..183b4e7 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -32,7 +32,10 @@ import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.Type;
import org.apache.airavata.messaging.core.impl.RabbitMQPublisher;
import
org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.data.replica.*;
+import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.messaging.event.*;
import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.model.status.*;
@@ -46,6 +49,7 @@ import org.apache.log4j.MDC;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.util.List;
public abstract class AiravataTask extends AbstractTask {
@@ -68,11 +72,16 @@ public abstract class AiravataTask extends AbstractTask {
@TaskParam(name = "gatewayId")
private String gatewayId;
+ @TaskParam(name = "Skip Status Publish")
+ private boolean skipTaskStatusPublish = false;
+
@TaskOutPort(name = "Next Task")
private OutPort nextTask;
protected TaskResult onSuccess(String message) {
- publishTaskState(TaskState.COMPLETED);
+ if (!skipTaskStatusPublish) {
+ publishTaskState(TaskState.COMPLETED);
+ }
String successMessage = "Task " + getTaskId() + " completed." +
(message != null ? " Message : " + message : "");
logger.info(successMessage);
return nextTask.invoke(new TaskResult(TaskResult.Status.COMPLETED,
message));
@@ -104,11 +113,13 @@ public abstract class AiravataTask extends AbstractTask {
errorModel.setActualErrorMessage(errors.toString());
errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
- publishTaskState(TaskState.FAILED);
- saveAndPublishProcessStatus();
- saveExperimentError(errorModel);
- saveProcessError(errorModel);
- saveTaskError(errorModel);
+ if (!skipTaskStatusPublish) {
+ publishTaskState(TaskState.FAILED);
+ saveAndPublishProcessStatus();
+ saveExperimentError(errorModel);
+ saveProcessError(errorModel);
+ saveTaskError(errorModel);
+ }
return new TaskResult(fatal ? TaskResult.Status.FATAL_FAILED :
TaskResult.Status.FAILED, errorMessage);
}
@@ -164,6 +175,41 @@ public abstract class AiravataTask extends AbstractTask {
}
}
+ public void saveExperimentOutput(String outputName, String outputVal)
throws TaskOnFailException {
+ try {
+ ExperimentModel experiment =
(ExperimentModel)experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT,
experimentId);
+ List<OutputDataObjectType> experimentOutputs =
experiment.getExperimentOutputs();
+ if (experimentOutputs != null && !experimentOutputs.isEmpty()){
+ for (OutputDataObjectType expOutput : experimentOutputs){
+ if (expOutput.getName().equals(outputName)){
+ DataProductModel dataProductModel = new
DataProductModel();
+ dataProductModel.setGatewayId(getGatewayId());
+
dataProductModel.setOwnerName(getProcessModel().getUserName());
+ dataProductModel.setProductName(outputName);
+
dataProductModel.setDataProductType(DataProductType.FILE);
+
+ DataReplicaLocationModel replicaLocationModel = new
DataReplicaLocationModel();
+
replicaLocationModel.setStorageResourceId(getTaskContext().getStorageResource().getStorageResourceId());
+ replicaLocationModel.setReplicaName(outputName + "
gateway data store copy");
+
replicaLocationModel.setReplicaLocationCategory(ReplicaLocationCategory.GATEWAY_DATA_STORE);
+
replicaLocationModel.setReplicaPersistentType(ReplicaPersistentType.TRANSIENT);
+ replicaLocationModel.setFilePath(outputVal);
+
dataProductModel.addToReplicaLocations(replicaLocationModel);
+
+ ReplicaCatalog replicaCatalog =
RegistryFactory.getReplicaCatalog();
+ String productUri =
replicaCatalog.registerDataProduct(dataProductModel);
+ expOutput.setValue(productUri);
+ }
+ }
+ }
+ experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT,
experiment, experimentId);
+
+ } catch (RegistryException | AppCatalogException e) {
+ String msg = "expId: " + getExperimentId() + " processId: " +
getProcessId() + " : - Error while updating experiment outputs";
+ throw new TaskOnFailException(msg, true, e);
+ }
+ }
+
@SuppressWarnings("WeakerAccess")
protected void saveExperimentError(ErrorModel errorModel) {
try {
@@ -218,7 +264,9 @@ public abstract class AiravataTask extends AbstractTask {
MDC.put("process", getProcessId());
MDC.put("gateway", getGatewayId());
MDC.put("task", getTaskId());
- publishTaskState(TaskState.EXECUTING);
+ if (!skipTaskStatusPublish) {
+ publishTaskState(TaskState.EXECUTING);
+ }
return onRun(helper, getTaskContext());
} finally {
MDC.clear();
@@ -234,7 +282,9 @@ public abstract class AiravataTask extends AbstractTask {
MDC.put("process", getProcessId());
MDC.put("gateway", getGatewayId());
MDC.put("task", getTaskId());
- publishTaskState(TaskState.CANCELED);
+ if (!skipTaskStatusPublish) {
+ publishTaskState(TaskState.CANCELED);
+ }
onCancel(getTaskContext());
} finally {
MDC.clear();
@@ -351,4 +401,12 @@ public abstract class AiravataTask extends AbstractTask {
public void setNextTask(OutPort nextTask) {
this.nextTask = nextTask;
}
+
+ public void setSkipTaskStatusPublish(boolean skipTaskStatusPublish) {
+ this.skipTaskStatusPublish = skipTaskStatusPublish;
+ }
+
+ public boolean isSkipTaskStatusPublish() {
+ return skipTaskStatusPublish;
+ }
}
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
index 3f4fe89..2eddc47 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
@@ -166,13 +166,14 @@ public class OutputDataStagingTask extends
DataStagingTask {
}
logger.info("Transferring file " + sourceFileName);
- transferFile(sourceURI, destinationURI, sourceFileName,
adaptor, storageResourceAdaptor);
+ transferFile(processOutput.getName(), sourceURI,
destinationURI, sourceFileName, adaptor, storageResourceAdaptor);
}
return onSuccess("Output data staging task " + getTaskId() + "
successfully completed");
} else {
// Downloading input file from the storage resource
- transferFile(sourceURI, destinationURI, sourceFileName,
adaptor, storageResourceAdaptor);
+ assert processOutput != null;
+ transferFile(processOutput.getName(), sourceURI,
destinationURI, sourceFileName, adaptor, storageResourceAdaptor);
return onSuccess("Output data staging task " + getTaskId() + "
successfully completed");
}
@@ -190,7 +191,7 @@ public class OutputDataStagingTask extends DataStagingTask {
}
}
- private void transferFile(URI sourceURI, URI destinationURI, String
fileName, AgentAdaptor adaptor,
+ private void transferFile(String outputName, URI sourceURI, URI
destinationURI, String fileName, AgentAdaptor adaptor,
StorageResourceAdaptor storageResourceAdaptor)
throws TaskOnFailException {
String localSourceFilePath = getLocalDataPath(fileName);
@@ -212,6 +213,8 @@ public class OutputDataStagingTask extends DataStagingTask {
throw new TaskOnFailException("Failed uploading the output file to
" + destinationURI.getPath() + " from local path " +
localSourceFilePath, true, e);
}
+
+ saveExperimentOutput(outputName, destinationURI.toString());
}
@Override
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index c8ef45c..77e753c 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -189,6 +189,7 @@ public class PostWorkflowManager {
completingTask.setExperimentId(experimentModel.getExperimentId());
completingTask.setProcessId(processModel.getProcessId());
completingTask.setTaskId("Completing-Task");
+ completingTask.setSkipTaskStatusPublish(true);
if (allTasks.size() > 0) {
allTasks.get(allTasks.size() - 1).setNextTask(new
OutPort(completingTask.getTaskId(), completingTask));
}
diff --git
a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
index a006c58..f58b365 100644
---
a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
+++
b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java
@@ -99,11 +99,14 @@ public class TaskUtil {
classField.setAccessible(true);
if (classField.getType().isAssignableFrom(String.class)) {
classField.set(instance, params.get(param.name()));
- } else if
(classField.getType().isAssignableFrom(Integer.class)) {
+ } else if
(classField.getType().isAssignableFrom(Integer.class) ||
+
classField.getType().isAssignableFrom(Integer.TYPE)) {
classField.set(instance,
Integer.parseInt(params.get(param.name())));
- } else if
(classField.getType().isAssignableFrom(Long.class)) {
+ } else if
(classField.getType().isAssignableFrom(Long.class) ||
+ classField.getType().isAssignableFrom(Long.TYPE)) {
classField.set(instance,
Long.parseLong(params.get(param.name())));
- } else if
(classField.getType().isAssignableFrom(Boolean.class)) {
+ } else if
(classField.getType().isAssignableFrom(Boolean.class) ||
+
classField.getType().isAssignableFrom(Boolean.TYPE)) {
classField.set(instance,
Boolean.parseBoolean(params.get(param.name())));
}
}
--
To stop receiving notification emails like this one, please contact
[email protected].