This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch experiment-cleanup-support in repository https://gitbox.apache.org/repos/asf/airavata.git
commit fa4dc3466b48cfca16060a67b56a37218aea9055 Author: DImuthuUpe <[email protected]> AuthorDate: Fri Nov 28 17:52:53 2025 -0500 Adding support to clean up experiment working directory on cluster --- .../init/07-cleanup-strategy-migration.sql | 4 +++ .../apache/airavata/agents/api/AgentAdaptor.java | 2 ++ .../agents/api/StorageResourceAdaptor.java | 2 ++ .../airavata/helix/adaptor/SSHJAgentAdaptor.java | 24 +++++++++++++++ .../airavata/helix/agent/ssh/SshAgentAdaptor.java | 34 ++++++++++++++++++++++ .../airavata/helix/impl/task/AiravataTask.java | 7 +++++ .../airavata/helix/impl/task/TaskContext.java | 13 +++++++++ .../helix/impl/task/completing/CompletingTask.java | 19 ++++++++++++ .../core/entities/expcatalog/ExperimentEntity.java | 14 +++++++++ .../data-models/experiment_model.thrift | 10 ++++++- 10 files changed, 128 insertions(+), 1 deletion(-) diff --git a/.devcontainer/database_scripts/init/07-cleanup-strategy-migration.sql b/.devcontainer/database_scripts/init/07-cleanup-strategy-migration.sql new file mode 100644 index 0000000000..7214580815 --- /dev/null +++ b/.devcontainer/database_scripts/init/07-cleanup-strategy-migration.sql @@ -0,0 +1,4 @@ +USE experiment_catalog; + +-- Add cleanAfterStaged flag to APPLICATION_INTERFACE +ALTER TABLE EXPERIMENT ADD COLUMN IF NOT EXISTS CLEANUP_STRATEGY VARCHAR(255) DEFAULT 'NONE'; diff --git a/airavata-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java b/airavata-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java index 36baea7060..575e2c396f 100644 --- a/airavata-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java +++ b/airavata-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java @@ -43,6 +43,8 @@ public interface AgentAdaptor { void createDirectory(String path, boolean recursive) throws AgentException; + void deleteDirectory(String path) throws AgentException; + void uploadFile(String localFile, String remoteFile) throws AgentException; void uploadFile(InputStream localInStream, FileMetadata metadata, String remoteFile) throws AgentException; diff --git a/airavata-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java b/airavata-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java index 845dc04f78..83ee95945c 100644 --- a/airavata-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java +++ b/airavata-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java @@ -41,6 +41,8 @@ public interface StorageResourceAdaptor extends AgentAdaptor { public void createDirectory(String path, boolean recursive) throws AgentException; + public void deleteDirectory(String path) throws AgentException; + public List<String> listDirectory(String path) throws AgentException; public Boolean doesFileExist(String filePath) throws AgentException; diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java index 2845afb1ef..b687c3093d 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java @@ -277,6 +277,30 @@ public class SSHJAgentAdaptor implements AgentAdaptor { } } + @Override + public void deleteDirectory(String path) throws AgentException { + SFTPClientWrapper sftpClient = null; + try { + sftpClient = sshjClient.newSFTPClientWrapper(); + sftpClient.rmdir(path); + } catch (Exception e) { + if (e instanceof ConnectionException) { + Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true)); + } + logger.error("Error while deleting directory " + path, e); + throw new AgentException(e); + + } finally { + Optional.ofNullable(sftpClient).ifPresent(client -> { + try { + client.close(); + } catch (IOException e) { + // Ignore + } + }); + } + } + @Override public void uploadFile(String localFile, String remoteFile) throws AgentException { SCPFileTransferWrapper fileTransfer = null; diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java b/airavata-api/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java index 8ee4237fd7..20c9d27387 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java @@ -182,6 +182,40 @@ public class SshAgentAdaptor implements AgentAdaptor { } } + @Override + public void deleteDirectory(String path) throws AgentException { + String command = "rm -rf " + path; + ChannelExec channelExec = null; + try { + channelExec = (ChannelExec) session.openChannel("exec"); + StandardOutReader stdOutReader = new StandardOutReader(); + + channelExec.setCommand(command); + InputStream out = channelExec.getInputStream(); + InputStream err = channelExec.getErrStream(); + channelExec.connect(); + + stdOutReader.readStdOutFromStream(out); + stdOutReader.readStdErrFromStream(err); + + if (stdOutReader.getStdError() != null && stdOutReader.getStdError().contains("mkdir:")) { + throw new AgentException(stdOutReader.getStdError()); + } + } catch (JSchException e) { + System.out.println("Unable to retrieve command output. Command - " + command + " on server - " + + session.getHost() + ":" + session.getPort() + " connecting user name - " + + session.getUserName()); + throw new AgentException(e); + } catch (IOException e) { + logger.error("Failed to delete directory " + path, e); + throw new AgentException("Failed to delete directory " + path, e); + } finally { + if (channelExec != null) { + channelExec.disconnect(); + } + } + } + public void uploadFile(String localFile, String remoteFile) throws AgentException { FileInputStream fis; diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java index f5f06c35ac..142ffb13b1 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java @@ -70,6 +70,7 @@ public abstract class AiravataTask extends AbstractTask { private static Publisher statusPublisher; private ProcessModel processModel; + private ExperimentModel experimentModel; private ComputeResourceDescription computeResourceDescription; private TaskContext taskContext; private String taskName; @@ -519,6 +520,7 @@ public abstract class AiravataTask extends AbstractTask { try { logger.info("Loading context for task " + getTaskId()); processModel = getRegistryServiceClient().getProcess(processId); + experimentModel = getRegistryServiceClient().getExperiment(experimentId); this.computeResourceDescription = getRegistryServiceClient().getComputeResource(this.processModel.getComputeResourceId()); @@ -527,6 +529,7 @@ public abstract class AiravataTask extends AbstractTask { getProcessId(), getGatewayId(), getTaskId()) .setRegistryClient(getRegistryServiceClient()) .setProfileClient(getUserProfileClient()) + .setExperimentModel(getExperimentModel()) .setProcessModel(getProcessModel()); this.taskContext = taskContextBuilder.build(); @@ -603,6 +606,10 @@ public abstract class AiravataTask extends AbstractTask { return processModel; } + protected ExperimentModel getExperimentModel() { + return experimentModel; + } + public void setSkipAllStatusPublish(boolean skipAllStatusPublish) { this.skipAllStatusPublish = skipAllStatusPublish; } diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java index 5bab41ae8f..46b38fb0aa 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java @@ -56,6 +56,7 @@ import org.apache.airavata.model.application.io.DataType; import org.apache.airavata.model.application.io.OutputDataObjectType; import org.apache.airavata.model.data.movement.DataMovementInterface; import org.apache.airavata.model.data.movement.DataMovementProtocol; +import org.apache.airavata.model.experiment.ExperimentModel; import org.apache.airavata.model.job.JobModel; import org.apache.airavata.model.process.ProcessModel; import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel; @@ -91,6 +92,7 @@ public class TaskContext { private String gatewayId; private String taskId; + private ExperimentModel experimentModel; private ProcessModel processModel; private JobModel jobModel; private Object subTaskModel = null; @@ -176,6 +178,10 @@ public class TaskContext { this.processModel = processModel; } + public void setExperimentModel(ExperimentModel experimentModel) { + this.experimentModel = experimentModel; + } + public String getWorkingDir() throws Exception { if (workingDir == null) { if (processModel.getProcessResourceSchedule().getStaticWorkingDir() != null) { @@ -1002,6 +1008,7 @@ public class TaskContext { private RegistryService.Client registryClient; private UserProfileService.Client profileClient; private ProcessModel processModel; + private ExperimentModel experimentModel; @SuppressWarnings("WeakerAccess") public TaskContextBuilder(String processId, String gatewayId, String taskId) throws Exception { @@ -1018,6 +1025,11 @@ public class TaskContext { return this; } + public TaskContextBuilder setExperimentModel(ExperimentModel experimentModel) { + this.experimentModel = experimentModel; + return this; + } + public TaskContextBuilder setRegistryClient(RegistryService.Client registryClient) { this.registryClient = registryClient; return this; @@ -1040,6 +1052,7 @@ public class TaskContext { TaskContext ctx = new TaskContext(processId, gatewayId, taskId); ctx.setRegistryClient(registryClient); ctx.setProcessModel(processModel); + ctx.setExperimentModel(experimentModel); ctx.setProfileClient(profileClient); return ctx; } diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/completing/CompletingTask.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/completing/CompletingTask.java index ca0c49ea4c..41ef2ef02d 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/completing/CompletingTask.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/completing/CompletingTask.java @@ -19,10 +19,12 @@ */ package org.apache.airavata.helix.impl.task.completing; +import org.apache.airavata.agents.api.AgentAdaptor; import org.apache.airavata.helix.impl.task.AiravataTask; import org.apache.airavata.helix.impl.task.TaskContext; import org.apache.airavata.helix.task.api.TaskHelper; import org.apache.airavata.helix.task.api.annotation.TaskDef; +import org.apache.airavata.model.experiment.ExperimentCleanupStrategy; import org.apache.airavata.model.status.ProcessState; import org.apache.helix.task.TaskResult; import org.slf4j.Logger; @@ -39,6 +41,23 @@ public class CompletingTask extends AiravataTask { logger.info("Process " + getProcessId() + " successfully completed"); saveAndPublishProcessStatus(ProcessState.COMPLETED); cleanup(); + + try { + if (getExperimentModel().getCleanUpStrategy() == ExperimentCleanupStrategy.ALWAYS) { + AgentAdaptor adaptor = helper + .getAdaptorSupport() + .fetchAdaptor( + getTaskContext().getGatewayId(), + getTaskContext().getComputeResourceId(), + getTaskContext().getJobSubmissionProtocol(), + getTaskContext().getComputeResourceCredentialToken(), + getTaskContext().getComputeResourceLoginUserName()); + logger.info("Cleaning up the working directory {}", taskContext.getWorkingDir()); + //adaptor.deleteDirectory(getTaskContext().getWorkingDir()); + } + } catch (Exception e) { + logger.error("Failed clean up experiment " + getExperimentId(), e); + } return onSuccess("Process " + getProcessId() + " successfully completed"); } diff --git a/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentEntity.java b/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentEntity.java index 848c688e1b..3b0a024419 100644 --- a/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentEntity.java +++ b/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentEntity.java @@ -23,6 +23,8 @@ import jakarta.persistence.*; import java.io.Serializable; import java.sql.Timestamp; import java.util.List; + +import org.apache.airavata.model.experiment.ExperimentCleanupStrategy; import org.apache.airavata.model.experiment.ExperimentType; /** @@ -47,6 +49,10 @@ public class ExperimentEntity implements Serializable { @Enumerated(EnumType.STRING) public ExperimentType experimentType; + @Column(name = "CLEANUP_STRATEGY") + @Enumerated(EnumType.STRING) + public ExperimentCleanupStrategy cleanupStrategy; + @Column(name = "USER_NAME") public String userName; @@ -264,6 +270,14 @@ public class ExperimentEntity implements Serializable { this.experimentStatus = experimentStatus; } + public ExperimentCleanupStrategy getCleanupStrategy() { + return cleanupStrategy; + } + + public void setCleanupStrategy(ExperimentCleanupStrategy cleanupStrategy) { + this.cleanupStrategy = cleanupStrategy; + } + public List<ProcessEntity> getProcesses() { return processes; } diff --git a/thrift-interface-descriptions/data-models/experiment_model.thrift b/thrift-interface-descriptions/data-models/experiment_model.thrift index 07ec759fe1..6d3ec9ebe2 100644 --- a/thrift-interface-descriptions/data-models/experiment_model.thrift +++ b/thrift-interface-descriptions/data-models/experiment_model.thrift @@ -73,6 +73,13 @@ struct UserConfigurationDataModel { 13: optional list<scheduling_model.ComputationalResourceSchedulingModel> autoScheduledCompResourceSchedulingList, } +enum ExperimentCleanupStrategy { + NONE, + ALWAYS, + ONLY_COMPLETED, + ONLY_FAILED +} + /** * A structure holding the experiment metadata and its child models. * @@ -110,7 +117,8 @@ struct ExperimentModel { 17: optional list<status_models.ExperimentStatus> experimentStatus, 18: optional list<airavata_commons.ErrorModel> errors, 19: optional list<process_model.ProcessModel> processes, - 20: optional workflow_model.AiravataWorkflow workflow + 20: optional workflow_model.AiravataWorkflow workflow, + 21: optional ExperimentCleanupStrategy cleanUpStrategy = ExperimentCleanupStrategy.NONE } struct ExperimentSummaryModel {
