This is an automated email from the ASF dual-hosted git repository.

lahirujayathilake pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/master by this push:
     new 452fc69d22 Adding support to clean up experiment working directory on 
cluster (#582)
452fc69d22 is described below

commit 452fc69d2270406c06319af674890ad43bac9d97
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Sat Nov 29 06:54:58 2025 -0500

    Adding support to clean up experiment working directory on cluster (#582)
    
    * Adding support to clean up experiment working directory on cluster
    
    * Adding commented delete dir functionality
    
    * escaped the directory path, generated python thrift stubs, and updated 
the java styles
    
    ---------
    
    Co-authored-by: lahiruj <[email protected]>
---
 .../init/07-cleanup-strategy-migration.sql         |  4 +++
 .../apache/airavata/agents/api/AgentAdaptor.java   |  2 ++
 .../agents/api/StorageResourceAdaptor.java         |  2 ++
 .../airavata/helix/adaptor/SSHJAgentAdaptor.java   | 27 ++++++++++++++
 .../airavata/helix/agent/ssh/SshAgentAdaptor.java  | 42 ++++++++++++++++++++++
 .../airavata/helix/impl/task/AiravataTask.java     |  7 ++++
 .../airavata/helix/impl/task/TaskContext.java      | 13 +++++++
 .../helix/impl/task/completing/CompletingTask.java | 18 ++++++++++
 .../core/entities/expcatalog/ExperimentEntity.java | 13 +++++++
 .../airavata/model/experiment/ttypes.py            | 25 ++++++++++++-
 .../data-models/experiment_model.thrift            | 10 +++++-
 11 files changed, 161 insertions(+), 2 deletions(-)

diff --git 
a/.devcontainer/database_scripts/init/07-cleanup-strategy-migration.sql 
b/.devcontainer/database_scripts/init/07-cleanup-strategy-migration.sql
new file mode 100644
index 0000000000..4b40dbd7ed
--- /dev/null
+++ b/.devcontainer/database_scripts/init/07-cleanup-strategy-migration.sql
@@ -0,0 +1,4 @@
+USE experiment_catalog;
+
+-- Add cleanupStrategy flag to EXPERIMENT
+ALTER TABLE EXPERIMENT ADD COLUMN IF NOT EXISTS CLEANUP_STRATEGY VARCHAR(255) 
DEFAULT 'NONE';
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java 
b/airavata-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
index 36baea7060..575e2c396f 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
@@ -43,6 +43,8 @@ public interface AgentAdaptor {
 
     void createDirectory(String path, boolean recursive) throws AgentException;
 
+    void deleteDirectory(String path) throws AgentException;
+
     void uploadFile(String localFile, String remoteFile) throws AgentException;
 
     void uploadFile(InputStream localInStream, FileMetadata metadata, String 
remoteFile) throws AgentException;
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
 
b/airavata-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
index 845dc04f78..83ee95945c 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
@@ -41,6 +41,8 @@ public interface StorageResourceAdaptor extends AgentAdaptor {
 
     public void createDirectory(String path, boolean recursive) throws 
AgentException;
 
+    public void deleteDirectory(String path) throws AgentException;
+
     public List<String> listDirectory(String path) throws AgentException;
 
     public Boolean doesFileExist(String filePath) throws AgentException;
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
index 2845afb1ef..495f9476d4 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
@@ -277,6 +277,33 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
         }
     }
 
+    @Override
+    public void deleteDirectory(String path) throws AgentException {
+        if (path == null || path.trim().isEmpty()) {
+            throw new AgentException("Directory path cannot be null or empty");
+        }
+        SFTPClientWrapper sftpClient = null;
+        try {
+            sftpClient = sshjClient.newSFTPClientWrapper();
+            sftpClient.rmdir(path);
+        } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(sftpClient).ifPresent(ft -> 
ft.setErrored(true));
+            }
+            logger.error("Error while deleting directory {}", path, e);
+            throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(sftpClient).ifPresent(client -> {
+                try {
+                    client.close();
+                } catch (IOException e) {
+                    // Ignore
+                }
+            });
+        }
+    }
+
     @Override
     public void uploadFile(String localFile, String remoteFile) throws 
AgentException {
         SCPFileTransferWrapper fileTransfer = null;
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
index 8ee4237fd7..f2da8f8a7a 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
@@ -182,6 +182,48 @@ public class SshAgentAdaptor implements AgentAdaptor {
         }
     }
 
+    @Override
+    public void deleteDirectory(String path) throws AgentException {
+        if (path == null || path.trim().isEmpty()) {
+            throw new AgentException("Directory path cannot be null or empty");
+        }
+        String escapedPath = path.replace("'", "'\"'\"'");
+        String command = "rm -rf '" + escapedPath + "'";
+        ChannelExec channelExec = null;
+        try {
+            channelExec = (ChannelExec) session.openChannel("exec");
+            StandardOutReader stdOutReader = new StandardOutReader();
+
+            channelExec.setCommand(command);
+            InputStream out = channelExec.getInputStream();
+            InputStream err = channelExec.getErrStream();
+            channelExec.connect();
+
+            stdOutReader.readStdOutFromStream(out);
+            stdOutReader.readStdErrFromStream(err);
+
+            if (stdOutReader.getStdError() != null && 
stdOutReader.getStdError().contains("rm:")) {
+                throw new AgentException(stdOutReader.getStdError());
+            }
+        } catch (JSchException e) {
+            logger.error(
+                    "Unable to retrieve command output. Command - {} on server 
- {}:{} connecting user name - {}",
+                    command,
+                    session.getHost(),
+                    session.getPort(),
+                    session.getUserName(),
+                    e);
+            throw new AgentException(e);
+        } catch (IOException e) {
+            logger.error("Failed to delete directory {}", path, e);
+            throw new AgentException("Failed to delete directory " + path, e);
+        } finally {
+            if (channelExec != null) {
+                channelExec.disconnect();
+            }
+        }
+    }
+
     public void uploadFile(String localFile, String remoteFile) throws 
AgentException {
 
         FileInputStream fis;
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index f5f06c35ac..97dec9e976 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -70,6 +70,7 @@ public abstract class AiravataTask extends AbstractTask {
     private static Publisher statusPublisher;
 
     private ProcessModel processModel;
+    private ExperimentModel experimentModel;
     private ComputeResourceDescription computeResourceDescription;
     private TaskContext taskContext;
     private String taskName;
@@ -519,6 +520,7 @@ public abstract class AiravataTask extends AbstractTask {
         try {
             logger.info("Loading context for task " + getTaskId());
             processModel = getRegistryServiceClient().getProcess(processId);
+            experimentModel = 
getRegistryServiceClient().getExperiment(experimentId);
 
             this.computeResourceDescription =
                     
getRegistryServiceClient().getComputeResource(this.processModel.getComputeResourceId());
@@ -527,6 +529,7 @@ public abstract class AiravataTask extends AbstractTask {
                             getProcessId(), getGatewayId(), getTaskId())
                     .setRegistryClient(getRegistryServiceClient())
                     .setProfileClient(getUserProfileClient())
+                    .setExperimentModel(getExperimentModel())
                     .setProcessModel(getProcessModel());
 
             this.taskContext = taskContextBuilder.build();
@@ -603,6 +606,10 @@ public abstract class AiravataTask extends AbstractTask {
         return processModel;
     }
 
+    protected ExperimentModel getExperimentModel() {
+        return experimentModel;
+    }
+
     public void setSkipAllStatusPublish(boolean skipAllStatusPublish) {
         this.skipAllStatusPublish = skipAllStatusPublish;
     }
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
index 5bab41ae8f..46b38fb0aa 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
@@ -56,6 +56,7 @@ import org.apache.airavata.model.application.io.DataType;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.data.movement.DataMovementInterface;
 import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.airavata.model.experiment.ExperimentModel;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.process.ProcessModel;
 import 
org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
@@ -91,6 +92,7 @@ public class TaskContext {
     private String gatewayId;
     private String taskId;
 
+    private ExperimentModel experimentModel;
     private ProcessModel processModel;
     private JobModel jobModel;
     private Object subTaskModel = null;
@@ -176,6 +178,10 @@ public class TaskContext {
         this.processModel = processModel;
     }
 
+    public void setExperimentModel(ExperimentModel experimentModel) {
+        this.experimentModel = experimentModel;
+    }
+
     public String getWorkingDir() throws Exception {
         if (workingDir == null) {
             if 
(processModel.getProcessResourceSchedule().getStaticWorkingDir() != null) {
@@ -1002,6 +1008,7 @@ public class TaskContext {
         private RegistryService.Client registryClient;
         private UserProfileService.Client profileClient;
         private ProcessModel processModel;
+        private ExperimentModel experimentModel;
 
         @SuppressWarnings("WeakerAccess")
         public TaskContextBuilder(String processId, String gatewayId, String 
taskId) throws Exception {
@@ -1018,6 +1025,11 @@ public class TaskContext {
             return this;
         }
 
+        public TaskContextBuilder setExperimentModel(ExperimentModel 
experimentModel) {
+            this.experimentModel = experimentModel;
+            return this;
+        }
+
         public TaskContextBuilder setRegistryClient(RegistryService.Client 
registryClient) {
             this.registryClient = registryClient;
             return this;
@@ -1040,6 +1052,7 @@ public class TaskContext {
             TaskContext ctx = new TaskContext(processId, gatewayId, taskId);
             ctx.setRegistryClient(registryClient);
             ctx.setProcessModel(processModel);
+            ctx.setExperimentModel(experimentModel);
             ctx.setProfileClient(profileClient);
             return ctx;
         }
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/completing/CompletingTask.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/completing/CompletingTask.java
index ca0c49ea4c..83528fc366 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/completing/CompletingTask.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/completing/CompletingTask.java
@@ -19,10 +19,12 @@
 */
 package org.apache.airavata.helix.impl.task.completing;
 
+import org.apache.airavata.agents.api.AgentAdaptor;
 import org.apache.airavata.helix.impl.task.AiravataTask;
 import org.apache.airavata.helix.impl.task.TaskContext;
 import org.apache.airavata.helix.task.api.TaskHelper;
 import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.experiment.ExperimentCleanupStrategy;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.helix.task.TaskResult;
 import org.slf4j.Logger;
@@ -39,6 +41,22 @@ public class CompletingTask extends AiravataTask {
         logger.info("Process " + getProcessId() + " successfully completed");
         saveAndPublishProcessStatus(ProcessState.COMPLETED);
         cleanup();
+
+        try {
+            if (getExperimentModel().getCleanUpStrategy() == 
ExperimentCleanupStrategy.ALWAYS) {
+                AgentAdaptor adaptor = helper.getAdaptorSupport()
+                        .fetchAdaptor(
+                                getTaskContext().getGatewayId(),
+                                getTaskContext().getComputeResourceId(),
+                                getTaskContext().getJobSubmissionProtocol(),
+                                
getTaskContext().getComputeResourceCredentialToken(),
+                                
getTaskContext().getComputeResourceLoginUserName());
+                logger.info("Cleaning up the working directory {}", 
taskContext.getWorkingDir());
+                adaptor.deleteDirectory(getTaskContext().getWorkingDir());
+            }
+        } catch (Exception e) {
+            logger.error("Failed clean up experiment " + getExperimentId(), e);
+        }
         return onSuccess("Process " + getProcessId() + " successfully 
completed");
     }
 
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentEntity.java
 
b/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentEntity.java
index 848c688e1b..0a208ecf8c 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentEntity.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentEntity.java
@@ -23,6 +23,7 @@ import jakarta.persistence.*;
 import java.io.Serializable;
 import java.sql.Timestamp;
 import java.util.List;
+import org.apache.airavata.model.experiment.ExperimentCleanupStrategy;
 import org.apache.airavata.model.experiment.ExperimentType;
 
 /**
@@ -47,6 +48,10 @@ public class ExperimentEntity implements Serializable {
     @Enumerated(EnumType.STRING)
     public ExperimentType experimentType;
 
+    @Column(name = "CLEANUP_STRATEGY")
+    @Enumerated(EnumType.STRING)
+    public ExperimentCleanupStrategy cleanupStrategy;
+
     @Column(name = "USER_NAME")
     public String userName;
 
@@ -264,6 +269,14 @@ public class ExperimentEntity implements Serializable {
         this.experimentStatus = experimentStatus;
     }
 
+    public ExperimentCleanupStrategy getCleanupStrategy() {
+        return cleanupStrategy;
+    }
+
+    public void setCleanupStrategy(ExperimentCleanupStrategy cleanupStrategy) {
+        this.cleanupStrategy = cleanupStrategy;
+    }
+
     public List<ProcessEntity> getProcesses() {
         return processes;
     }
diff --git a/dev-tools/airavata-python-sdk/airavata/model/experiment/ttypes.py 
b/dev-tools/airavata-python-sdk/airavata/model/experiment/ttypes.py
index abbcfe59b7..93d36e2732 100644
--- a/dev-tools/airavata-python-sdk/airavata/model/experiment/ttypes.py
+++ b/dev-tools/airavata-python-sdk/airavata/model/experiment/ttypes.py
@@ -51,6 +51,14 @@ class ProjectSearchFields(IntEnum):
 
 
 
+class ExperimentCleanupStrategy(IntEnum):
+    NONE = 0
+    ALWAYS = 1
+    ONLY_COMPLETED = 2
+    ONLY_FAILED = 3
+
+
+
 class UserConfigurationDataModel(object):
     """
     A structure holding the experiment configuration.
@@ -298,12 +306,13 @@ class ExperimentModel(object):
      - errors
      - processes
      - workflow
+     - cleanUpStrategy
 
     """
     thrift_spec: typing.Any = None
 
 
-    def __init__(self, experimentId: str = "DO_NOT_SET_AT_CLIENTS", projectId: 
str = None, gatewayId: str = None, experimentType: ExperimentType =     
ExperimentType.SINGLE_APPLICATION, userName: str = None, experimentName: str = 
None, creationTime: typing.Optional[int] = None, description: 
typing.Optional[str] = None, executionId: typing.Optional[str] = None, 
gatewayExecutionId: typing.Optional[str] = None, gatewayInstanceId: 
typing.Optional[str] = None, enableEmailNotification: typing. [...]
+    def __init__(self, experimentId: str = "DO_NOT_SET_AT_CLIENTS", projectId: 
str = None, gatewayId: str = None, experimentType: ExperimentType =     
ExperimentType.SINGLE_APPLICATION, userName: str = None, experimentName: str = 
None, creationTime: typing.Optional[int] = None, description: 
typing.Optional[str] = None, executionId: typing.Optional[str] = None, 
gatewayExecutionId: typing.Optional[str] = None, gatewayInstanceId: 
typing.Optional[str] = None, enableEmailNotification: typing. [...]
         self.experimentId: str = experimentId
         self.projectId: str = projectId
         self.gatewayId: str = gatewayId
@@ -324,11 +333,15 @@ class ExperimentModel(object):
         self.errors: 
typing.Optional[list[airavata.model.commons.ttypes.ErrorModel]] = errors
         self.processes: 
typing.Optional[list[airavata.model.process.ttypes.ProcessModel]] = processes
         self.workflow: 
typing.Optional[airavata.model.workflow.ttypes.AiravataWorkflow] = workflow
+        self.cleanUpStrategy: typing.Optional[ExperimentCleanupStrategy] = 
cleanUpStrategy
 
     def __setattr__(self, name, value):
         if name == "experimentType":
             super().__setattr__(name, value if hasattr(value, 'value') else 
ExperimentType.__members__.get(value))
             return
+        if name == "cleanUpStrategy":
+            super().__setattr__(name, value if hasattr(value, 'value') else 
ExperimentCleanupStrategy.__members__.get(value))
+            return
         super().__setattr__(name, value)
 
 
@@ -478,6 +491,11 @@ class ExperimentModel(object):
                     self.workflow.read(iprot)
                 else:
                     iprot.skip(ftype)
+            elif fid == 21:
+                if ftype == TType.I32:
+                    self.cleanUpStrategy = 
ExperimentCleanupStrategy(iprot.readI32())
+                else:
+                    iprot.skip(ftype)
             else:
                 iprot.skip(ftype)
             iprot.readFieldEnd()
@@ -587,6 +605,10 @@ class ExperimentModel(object):
             oprot.writeFieldBegin('workflow', TType.STRUCT, 20)
             self.workflow.write(oprot)
             oprot.writeFieldEnd()
+        if self.cleanUpStrategy is not None:
+            oprot.writeFieldBegin('cleanUpStrategy', TType.I32, 21)
+            oprot.writeI32(self.cleanUpStrategy.value)
+            oprot.writeFieldEnd()
         oprot.writeFieldStop()
         oprot.writeStructEnd()
 
@@ -1080,6 +1102,7 @@ ExperimentModel.thrift_spec = (
     (18, TType.LIST, 'errors', (TType.STRUCT, 
[airavata.model.commons.ttypes.ErrorModel, None], False), None, ),  # 18
     (19, TType.LIST, 'processes', (TType.STRUCT, 
[airavata.model.process.ttypes.ProcessModel, None], False), None, ),  # 19
     (20, TType.STRUCT, 'workflow', 
[airavata.model.workflow.ttypes.AiravataWorkflow, None], None, ),  # 20
+    (21, TType.I32, 'cleanUpStrategy', None,     
ExperimentCleanupStrategy.NONE, ),  # 21
 )
 all_structs.append(ExperimentSummaryModel)
 ExperimentSummaryModel.thrift_spec = (
diff --git a/thrift-interface-descriptions/data-models/experiment_model.thrift 
b/thrift-interface-descriptions/data-models/experiment_model.thrift
index 07ec759fe1..6d3ec9ebe2 100644
--- a/thrift-interface-descriptions/data-models/experiment_model.thrift
+++ b/thrift-interface-descriptions/data-models/experiment_model.thrift
@@ -73,6 +73,13 @@ struct UserConfigurationDataModel {
     13: optional list<scheduling_model.ComputationalResourceSchedulingModel> 
autoScheduledCompResourceSchedulingList,
 }
 
+enum ExperimentCleanupStrategy {
+    NONE,
+    ALWAYS,
+    ONLY_COMPLETED,
+    ONLY_FAILED
+}
+
 /**
  * A structure holding the experiment metadata and its child models.
  *
@@ -110,7 +117,8 @@ struct ExperimentModel {
     17: optional list<status_models.ExperimentStatus> experimentStatus,
     18: optional list<airavata_commons.ErrorModel> errors,
     19: optional list<process_model.ProcessModel> processes,
-    20: optional workflow_model.AiravataWorkflow workflow
+    20: optional workflow_model.AiravataWorkflow workflow,
+    21: optional ExperimentCleanupStrategy cleanUpStrategy = 
ExperimentCleanupStrategy.NONE
     }
 
 struct ExperimentSummaryModel {

Reply via email to