This is an automated email from the ASF dual-hosted git repository.
lahirujayathilake pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/master by this push:
new 452fc69d22 Adding support to clean up experiment working directory on
cluster (#582)
452fc69d22 is described below
commit 452fc69d2270406c06319af674890ad43bac9d97
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Sat Nov 29 06:54:58 2025 -0500
Adding support to clean up experiment working directory on cluster (#582)
* Adding support to clean up experiment working directory on cluster
* Adding commented delete dir functionality
* escaped the directory path, generated python thrift stubs, and updated
the java styles
---------
Co-authored-by: lahiruj <[email protected]>
---
.../init/07-cleanup-strategy-migration.sql | 4 +++
.../apache/airavata/agents/api/AgentAdaptor.java | 2 ++
.../agents/api/StorageResourceAdaptor.java | 2 ++
.../airavata/helix/adaptor/SSHJAgentAdaptor.java | 27 ++++++++++++++
.../airavata/helix/agent/ssh/SshAgentAdaptor.java | 42 ++++++++++++++++++++++
.../airavata/helix/impl/task/AiravataTask.java | 7 ++++
.../airavata/helix/impl/task/TaskContext.java | 13 +++++++
.../helix/impl/task/completing/CompletingTask.java | 18 ++++++++++
.../core/entities/expcatalog/ExperimentEntity.java | 13 +++++++
.../airavata/model/experiment/ttypes.py | 25 ++++++++++++-
.../data-models/experiment_model.thrift | 10 +++++-
11 files changed, 161 insertions(+), 2 deletions(-)
diff --git
a/.devcontainer/database_scripts/init/07-cleanup-strategy-migration.sql
b/.devcontainer/database_scripts/init/07-cleanup-strategy-migration.sql
new file mode 100644
index 0000000000..4b40dbd7ed
--- /dev/null
+++ b/.devcontainer/database_scripts/init/07-cleanup-strategy-migration.sql
@@ -0,0 +1,4 @@
+USE experiment_catalog;
+
+-- Add cleanupStrategy flag to EXPERIMENT
+ALTER TABLE EXPERIMENT ADD COLUMN IF NOT EXISTS CLEANUP_STRATEGY VARCHAR(255)
DEFAULT 'NONE';
diff --git
a/airavata-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
b/airavata-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
index 36baea7060..575e2c396f 100644
---
a/airavata-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
+++
b/airavata-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
@@ -43,6 +43,8 @@ public interface AgentAdaptor {
void createDirectory(String path, boolean recursive) throws AgentException;
+ void deleteDirectory(String path) throws AgentException;
+
void uploadFile(String localFile, String remoteFile) throws AgentException;
void uploadFile(InputStream localInStream, FileMetadata metadata, String
remoteFile) throws AgentException;
diff --git
a/airavata-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
b/airavata-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
index 845dc04f78..83ee95945c 100644
---
a/airavata-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
+++
b/airavata-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
@@ -41,6 +41,8 @@ public interface StorageResourceAdaptor extends AgentAdaptor {
public void createDirectory(String path, boolean recursive) throws
AgentException;
+ public void deleteDirectory(String path) throws AgentException;
+
public List<String> listDirectory(String path) throws AgentException;
public Boolean doesFileExist(String filePath) throws AgentException;
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
index 2845afb1ef..495f9476d4 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
@@ -277,6 +277,33 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
}
}
+ @Override
+ public void deleteDirectory(String path) throws AgentException {
+ if (path == null || path.trim().isEmpty()) {
+ throw new AgentException("Directory path cannot be null or empty");
+ }
+ SFTPClientWrapper sftpClient = null;
+ try {
+ sftpClient = sshjClient.newSFTPClientWrapper();
+ sftpClient.rmdir(path);
+ } catch (Exception e) {
+ if (e instanceof ConnectionException) {
+ Optional.ofNullable(sftpClient).ifPresent(ft ->
ft.setErrored(true));
+ }
+ logger.error("Error while deleting directory {}", path, e);
+ throw new AgentException(e);
+
+ } finally {
+ Optional.ofNullable(sftpClient).ifPresent(client -> {
+ try {
+ client.close();
+ } catch (IOException e) {
+ // Ignore
+ }
+ });
+ }
+ }
+
@Override
public void uploadFile(String localFile, String remoteFile) throws
AgentException {
SCPFileTransferWrapper fileTransfer = null;
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
b/airavata-api/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
index 8ee4237fd7..f2da8f8a7a 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
@@ -182,6 +182,48 @@ public class SshAgentAdaptor implements AgentAdaptor {
}
}
+ @Override
+ public void deleteDirectory(String path) throws AgentException {
+ if (path == null || path.trim().isEmpty()) {
+ throw new AgentException("Directory path cannot be null or empty");
+ }
+ String escapedPath = path.replace("'", "'\"'\"'");
+ String command = "rm -rf '" + escapedPath + "'";
+ ChannelExec channelExec = null;
+ try {
+ channelExec = (ChannelExec) session.openChannel("exec");
+ StandardOutReader stdOutReader = new StandardOutReader();
+
+ channelExec.setCommand(command);
+ InputStream out = channelExec.getInputStream();
+ InputStream err = channelExec.getErrStream();
+ channelExec.connect();
+
+ stdOutReader.readStdOutFromStream(out);
+ stdOutReader.readStdErrFromStream(err);
+
+ if (stdOutReader.getStdError() != null &&
stdOutReader.getStdError().contains("rm:")) {
+ throw new AgentException(stdOutReader.getStdError());
+ }
+ } catch (JSchException e) {
+ logger.error(
+ "Unable to retrieve command output. Command - {} on server
- {}:{} connecting user name - {}",
+ command,
+ session.getHost(),
+ session.getPort(),
+ session.getUserName(),
+ e);
+ throw new AgentException(e);
+ } catch (IOException e) {
+ logger.error("Failed to delete directory {}", path, e);
+ throw new AgentException("Failed to delete directory " + path, e);
+ } finally {
+ if (channelExec != null) {
+ channelExec.disconnect();
+ }
+ }
+ }
+
public void uploadFile(String localFile, String remoteFile) throws
AgentException {
FileInputStream fis;
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index f5f06c35ac..97dec9e976 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -70,6 +70,7 @@ public abstract class AiravataTask extends AbstractTask {
private static Publisher statusPublisher;
private ProcessModel processModel;
+ private ExperimentModel experimentModel;
private ComputeResourceDescription computeResourceDescription;
private TaskContext taskContext;
private String taskName;
@@ -519,6 +520,7 @@ public abstract class AiravataTask extends AbstractTask {
try {
logger.info("Loading context for task " + getTaskId());
processModel = getRegistryServiceClient().getProcess(processId);
+ experimentModel =
getRegistryServiceClient().getExperiment(experimentId);
this.computeResourceDescription =
getRegistryServiceClient().getComputeResource(this.processModel.getComputeResourceId());
@@ -527,6 +529,7 @@ public abstract class AiravataTask extends AbstractTask {
getProcessId(), getGatewayId(), getTaskId())
.setRegistryClient(getRegistryServiceClient())
.setProfileClient(getUserProfileClient())
+ .setExperimentModel(getExperimentModel())
.setProcessModel(getProcessModel());
this.taskContext = taskContextBuilder.build();
@@ -603,6 +606,10 @@ public abstract class AiravataTask extends AbstractTask {
return processModel;
}
+ protected ExperimentModel getExperimentModel() {
+ return experimentModel;
+ }
+
public void setSkipAllStatusPublish(boolean skipAllStatusPublish) {
this.skipAllStatusPublish = skipAllStatusPublish;
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
index 5bab41ae8f..46b38fb0aa 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
@@ -56,6 +56,7 @@ import org.apache.airavata.model.application.io.DataType;
import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.data.movement.DataMovementInterface;
import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.process.ProcessModel;
import
org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
@@ -91,6 +92,7 @@ public class TaskContext {
private String gatewayId;
private String taskId;
+ private ExperimentModel experimentModel;
private ProcessModel processModel;
private JobModel jobModel;
private Object subTaskModel = null;
@@ -176,6 +178,10 @@ public class TaskContext {
this.processModel = processModel;
}
+ public void setExperimentModel(ExperimentModel experimentModel) {
+ this.experimentModel = experimentModel;
+ }
+
public String getWorkingDir() throws Exception {
if (workingDir == null) {
if
(processModel.getProcessResourceSchedule().getStaticWorkingDir() != null) {
@@ -1002,6 +1008,7 @@ public class TaskContext {
private RegistryService.Client registryClient;
private UserProfileService.Client profileClient;
private ProcessModel processModel;
+ private ExperimentModel experimentModel;
@SuppressWarnings("WeakerAccess")
public TaskContextBuilder(String processId, String gatewayId, String
taskId) throws Exception {
@@ -1018,6 +1025,11 @@ public class TaskContext {
return this;
}
+ public TaskContextBuilder setExperimentModel(ExperimentModel
experimentModel) {
+ this.experimentModel = experimentModel;
+ return this;
+ }
+
public TaskContextBuilder setRegistryClient(RegistryService.Client
registryClient) {
this.registryClient = registryClient;
return this;
@@ -1040,6 +1052,7 @@ public class TaskContext {
TaskContext ctx = new TaskContext(processId, gatewayId, taskId);
ctx.setRegistryClient(registryClient);
ctx.setProcessModel(processModel);
+ ctx.setExperimentModel(experimentModel);
ctx.setProfileClient(profileClient);
return ctx;
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/completing/CompletingTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/completing/CompletingTask.java
index ca0c49ea4c..83528fc366 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/completing/CompletingTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/completing/CompletingTask.java
@@ -19,10 +19,12 @@
*/
package org.apache.airavata.helix.impl.task.completing;
+import org.apache.airavata.agents.api.AgentAdaptor;
import org.apache.airavata.helix.impl.task.AiravataTask;
import org.apache.airavata.helix.impl.task.TaskContext;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.experiment.ExperimentCleanupStrategy;
import org.apache.airavata.model.status.ProcessState;
import org.apache.helix.task.TaskResult;
import org.slf4j.Logger;
@@ -39,6 +41,22 @@ public class CompletingTask extends AiravataTask {
logger.info("Process " + getProcessId() + " successfully completed");
saveAndPublishProcessStatus(ProcessState.COMPLETED);
cleanup();
+
+ try {
+ if (getExperimentModel().getCleanUpStrategy() ==
ExperimentCleanupStrategy.ALWAYS) {
+ AgentAdaptor adaptor = helper.getAdaptorSupport()
+ .fetchAdaptor(
+ getTaskContext().getGatewayId(),
+ getTaskContext().getComputeResourceId(),
+ getTaskContext().getJobSubmissionProtocol(),
+
getTaskContext().getComputeResourceCredentialToken(),
+
getTaskContext().getComputeResourceLoginUserName());
+ logger.info("Cleaning up the working directory {}",
taskContext.getWorkingDir());
+ adaptor.deleteDirectory(getTaskContext().getWorkingDir());
+ }
+ } catch (Exception e) {
+ logger.error("Failed clean up experiment " + getExperimentId(), e);
+ }
return onSuccess("Process " + getProcessId() + " successfully
completed");
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentEntity.java
b/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentEntity.java
index 848c688e1b..0a208ecf8c 100644
---
a/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentEntity.java
+++
b/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentEntity.java
@@ -23,6 +23,7 @@ import jakarta.persistence.*;
import java.io.Serializable;
import java.sql.Timestamp;
import java.util.List;
+import org.apache.airavata.model.experiment.ExperimentCleanupStrategy;
import org.apache.airavata.model.experiment.ExperimentType;
/**
@@ -47,6 +48,10 @@ public class ExperimentEntity implements Serializable {
@Enumerated(EnumType.STRING)
public ExperimentType experimentType;
+ @Column(name = "CLEANUP_STRATEGY")
+ @Enumerated(EnumType.STRING)
+ public ExperimentCleanupStrategy cleanupStrategy;
+
@Column(name = "USER_NAME")
public String userName;
@@ -264,6 +269,14 @@ public class ExperimentEntity implements Serializable {
this.experimentStatus = experimentStatus;
}
+ public ExperimentCleanupStrategy getCleanupStrategy() {
+ return cleanupStrategy;
+ }
+
+ public void setCleanupStrategy(ExperimentCleanupStrategy cleanupStrategy) {
+ this.cleanupStrategy = cleanupStrategy;
+ }
+
public List<ProcessEntity> getProcesses() {
return processes;
}
diff --git a/dev-tools/airavata-python-sdk/airavata/model/experiment/ttypes.py
b/dev-tools/airavata-python-sdk/airavata/model/experiment/ttypes.py
index abbcfe59b7..93d36e2732 100644
--- a/dev-tools/airavata-python-sdk/airavata/model/experiment/ttypes.py
+++ b/dev-tools/airavata-python-sdk/airavata/model/experiment/ttypes.py
@@ -51,6 +51,14 @@ class ProjectSearchFields(IntEnum):
+class ExperimentCleanupStrategy(IntEnum):
+ NONE = 0
+ ALWAYS = 1
+ ONLY_COMPLETED = 2
+ ONLY_FAILED = 3
+
+
+
class UserConfigurationDataModel(object):
"""
A structure holding the experiment configuration.
@@ -298,12 +306,13 @@ class ExperimentModel(object):
- errors
- processes
- workflow
+ - cleanUpStrategy
"""
thrift_spec: typing.Any = None
- def __init__(self, experimentId: str = "DO_NOT_SET_AT_CLIENTS", projectId:
str = None, gatewayId: str = None, experimentType: ExperimentType =
ExperimentType.SINGLE_APPLICATION, userName: str = None, experimentName: str =
None, creationTime: typing.Optional[int] = None, description:
typing.Optional[str] = None, executionId: typing.Optional[str] = None,
gatewayExecutionId: typing.Optional[str] = None, gatewayInstanceId:
typing.Optional[str] = None, enableEmailNotification: typing. [...]
+ def __init__(self, experimentId: str = "DO_NOT_SET_AT_CLIENTS", projectId:
str = None, gatewayId: str = None, experimentType: ExperimentType =
ExperimentType.SINGLE_APPLICATION, userName: str = None, experimentName: str =
None, creationTime: typing.Optional[int] = None, description:
typing.Optional[str] = None, executionId: typing.Optional[str] = None,
gatewayExecutionId: typing.Optional[str] = None, gatewayInstanceId:
typing.Optional[str] = None, enableEmailNotification: typing. [...]
self.experimentId: str = experimentId
self.projectId: str = projectId
self.gatewayId: str = gatewayId
@@ -324,11 +333,15 @@ class ExperimentModel(object):
self.errors:
typing.Optional[list[airavata.model.commons.ttypes.ErrorModel]] = errors
self.processes:
typing.Optional[list[airavata.model.process.ttypes.ProcessModel]] = processes
self.workflow:
typing.Optional[airavata.model.workflow.ttypes.AiravataWorkflow] = workflow
+ self.cleanUpStrategy: typing.Optional[ExperimentCleanupStrategy] =
cleanUpStrategy
def __setattr__(self, name, value):
if name == "experimentType":
super().__setattr__(name, value if hasattr(value, 'value') else
ExperimentType.__members__.get(value))
return
+ if name == "cleanUpStrategy":
+ super().__setattr__(name, value if hasattr(value, 'value') else
ExperimentCleanupStrategy.__members__.get(value))
+ return
super().__setattr__(name, value)
@@ -478,6 +491,11 @@ class ExperimentModel(object):
self.workflow.read(iprot)
else:
iprot.skip(ftype)
+ elif fid == 21:
+ if ftype == TType.I32:
+ self.cleanUpStrategy =
ExperimentCleanupStrategy(iprot.readI32())
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -587,6 +605,10 @@ class ExperimentModel(object):
oprot.writeFieldBegin('workflow', TType.STRUCT, 20)
self.workflow.write(oprot)
oprot.writeFieldEnd()
+ if self.cleanUpStrategy is not None:
+ oprot.writeFieldBegin('cleanUpStrategy', TType.I32, 21)
+ oprot.writeI32(self.cleanUpStrategy.value)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -1080,6 +1102,7 @@ ExperimentModel.thrift_spec = (
(18, TType.LIST, 'errors', (TType.STRUCT,
[airavata.model.commons.ttypes.ErrorModel, None], False), None, ), # 18
(19, TType.LIST, 'processes', (TType.STRUCT,
[airavata.model.process.ttypes.ProcessModel, None], False), None, ), # 19
(20, TType.STRUCT, 'workflow',
[airavata.model.workflow.ttypes.AiravataWorkflow, None], None, ), # 20
+ (21, TType.I32, 'cleanUpStrategy', None,
ExperimentCleanupStrategy.NONE, ), # 21
)
all_structs.append(ExperimentSummaryModel)
ExperimentSummaryModel.thrift_spec = (
diff --git a/thrift-interface-descriptions/data-models/experiment_model.thrift
b/thrift-interface-descriptions/data-models/experiment_model.thrift
index 07ec759fe1..6d3ec9ebe2 100644
--- a/thrift-interface-descriptions/data-models/experiment_model.thrift
+++ b/thrift-interface-descriptions/data-models/experiment_model.thrift
@@ -73,6 +73,13 @@ struct UserConfigurationDataModel {
13: optional list<scheduling_model.ComputationalResourceSchedulingModel>
autoScheduledCompResourceSchedulingList,
}
+enum ExperimentCleanupStrategy {
+ NONE,
+ ALWAYS,
+ ONLY_COMPLETED,
+ ONLY_FAILED
+}
+
/**
* A structure holding the experiment metadata and its child models.
*
@@ -110,7 +117,8 @@ struct ExperimentModel {
17: optional list<status_models.ExperimentStatus> experimentStatus,
18: optional list<airavata_commons.ErrorModel> errors,
19: optional list<process_model.ProcessModel> processes,
- 20: optional workflow_model.AiravataWorkflow workflow
+ 20: optional workflow_model.AiravataWorkflow workflow,
+ 21: optional ExperimentCleanupStrategy cleanUpStrategy =
ExperimentCleanupStrategy.NONE
}
struct ExperimentSummaryModel {