This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8281d36a5b Add cleanAfterStaged flag to delete source files after 
successful staging
8281d36a5b is described below

commit 8281d36a5b1e483ed274d274caff61f903083768
Author: lahiruj <[email protected]>
AuthorDate: Thu Nov 27 00:00:14 2025 -0500

    Add cleanAfterStaged flag to delete source files after successful staging
---
 .../init/03-appcatalog-migrations.sql              |   3 +
 .../impl/task/staging/OutputDataStagingTask.java   | 136 ++++++++++++++++++++-
 .../appcatalog/ApplicationInterfaceEntity.java     |  11 ++
 .../database_scripts/appcatalog-mysql.sql          |   1 +
 .../model/appcatalog/appinterface/ttypes.py        |  14 ++-
 .../data-models/application_interface_model.thrift |   3 +-
 6 files changed, 161 insertions(+), 7 deletions(-)

diff --git a/.devcontainer/database_scripts/init/03-appcatalog-migrations.sql 
b/.devcontainer/database_scripts/init/03-appcatalog-migrations.sql
index e3a1c3358d..e7ef4a55c8 100644
--- a/.devcontainer/database_scripts/init/03-appcatalog-migrations.sql
+++ b/.devcontainer/database_scripts/init/03-appcatalog-migrations.sql
@@ -17,3 +17,6 @@ ALTER TABLE COMPUTE_RESOURCE_RESERVATION ADD CONSTRAINT 
FK_COMPUTE_RESOURCE_RESE
 
 -- AIRAVATA-3369: Convert USER_FRIENDLY_DESC from VARCHAR to TEXT (CLOB)
 alter table APPLICATION_INPUT modify column USER_FRIENDLY_DESC TEXT;
+
+-- Add cleanAfterStaged flag to APPLICATION_INTERFACE
+ALTER TABLE APPLICATION_INTERFACE ADD COLUMN IF NOT EXISTS CLEAN_AFTER_STAGED 
SMALLINT DEFAULT 0;
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
index 8a80da1f4b..93d683e570 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
@@ -27,11 +27,13 @@ import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.airavata.agents.api.AgentAdaptor;
 import org.apache.airavata.agents.api.AgentException;
+import org.apache.airavata.agents.api.CommandOutput;
 import org.apache.airavata.agents.api.StorageResourceAdaptor;
 import org.apache.airavata.helix.impl.task.TaskContext;
 import org.apache.airavata.helix.impl.task.TaskOnFailException;
 import org.apache.airavata.helix.task.api.TaskHelper;
 import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import 
org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
 import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
 import 
org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
 import org.apache.airavata.model.application.io.DataType;
@@ -127,7 +129,8 @@ public class OutputDataStagingTask extends DataStagingTask {
             // Fetch and validate compute resource adaptor
             AgentAdaptor adaptor = 
getComputeResourceAdaptor(taskHelper.getAdaptorSupport());
 
-            List<URI> destinationURIs = new ArrayList<URI>();
+            List<URI> destinationURIs = new ArrayList<>();
+            List<String> successfullyTransferredSourcePaths = new 
ArrayList<>();
 
             if (sourceFileName.contains("*")) {
                 // if file is declared as a wild card
@@ -180,15 +183,16 @@ public class OutputDataStagingTask extends 
DataStagingTask {
                             storageResourceAdaptor);
                     if (transferred) {
                         destinationURIs.add(destinationURI);
+                        
successfullyTransferredSourcePaths.add(newSourceURI.getPath());
                     } else {
-                        logger.warn("File " + sourceFileName + " did not 
transfer");
+                        logger.warn("File {} did not transfer", 
sourceFileName);
                     }
 
                     if (processOutput.getType() == DataType.URI) {
                         if (filePaths.size() > 1) {
                             logger.warn(
-                                    "More than one file matched wildcard, but 
output type is URI. Skipping remaining matches: "
-                                            + filePaths.subList(1, 
filePaths.size()));
+                                    "More than one file matched wildcard, but 
output type is URI. Skipping remaining matches: {}",
+                                    filePaths.subList(1, filePaths.size()));
                         }
                         break;
                     }
@@ -206,6 +210,31 @@ public class OutputDataStagingTask extends DataStagingTask 
{
                                         .map(this::escapeSpecialCharacters)
                                         .collect(Collectors.toList()));
                     }
+
+                    try {
+                        ApplicationInterfaceDescription appInterface =
+                                
getTaskContext().getApplicationInterfaceDescription();
+                        if (appInterface != null && 
appInterface.isCleanAfterStaged()) {
+                            logger.info(
+                                    "cleanAfterStaged is enabled, deleting 
source files after successful staging for task with the Id: {}",
+                                    getTaskId());
+                            // Delete only successfully transferred source 
files
+                            boolean allDeleted = 
deleteSourceFiles(successfullyTransferredSourcePaths, adaptor);
+                            if (!allDeleted) {
+                                logger.warn(
+                                        "Some source files could not be 
deleted after staging for task {}.",
+                                        getTaskId());
+                            } else if 
(!successfullyTransferredSourcePaths.isEmpty()) {
+                                logger.info(
+                                        "Successfully deleted all {} source 
file(s) after staging for task {}",
+                                        
successfullyTransferredSourcePaths.size(),
+                                        getTaskId());
+                            }
+                            deleteEmptyDirectoryIfNeeded(sourceParentPath, 
adaptor);
+                        }
+                    } catch (Exception e) {
+                        logger.warn("Failed to clean up source files after 
staging for task {}", getTaskId(), e);
+                    }
                 }
                 return onSuccess("Output data staging task " + getTaskId() + " 
successfully completed");
 
@@ -216,8 +245,33 @@ public class OutputDataStagingTask extends DataStagingTask 
{
                         sourceURI.getPath(), destinationURI.getPath(), 
sourceFileName, adaptor, storageResourceAdaptor);
                 if (transferred) {
                     saveExperimentOutput(processOutput.getName(), 
escapeSpecialCharacters(destinationURI.toString()));
+
+                    try {
+                        ApplicationInterfaceDescription appInterface =
+                                
getTaskContext().getApplicationInterfaceDescription();
+                        if (appInterface != null && 
appInterface.isCleanAfterStaged()) {
+                            logger.info(
+                                    "cleanAfterStaged is enabled, deleting 
source file after successful staging for task with the Id: {}",
+                                    getTaskId());
+                            boolean deleted = 
deleteSourceFiles(List.of(sourceURI.getPath()), adaptor);
+                            if (!deleted) {
+                                logger.warn("Source file could not be deleted 
after staging for task {}.", getTaskId());
+                            } else {
+                                logger.info("Successfully deleted source file 
after staging for task {}", getTaskId());
+                            }
+                            String sourceParentPath = (new 
File(sourceURI.getPath()))
+                                    .getParentFile()
+                                    .getPath();
+                            deleteEmptyDirectoryIfNeeded(sourceParentPath, 
adaptor);
+                        }
+                    } catch (Exception e) {
+                        logger.warn(
+                                "Failed to clean up source file after staging 
for task {}. Staging completed successfully.",
+                                getTaskId(),
+                                e);
+                    }
                 } else {
-                    logger.warn("File " + sourceFileName + " did not 
transfer");
+                    logger.warn("File {} did not transfer", sourceFileName);
                 }
                 return onSuccess("Output data staging task " + getTaskId() + " 
successfully completed");
             }
@@ -236,6 +290,78 @@ public class OutputDataStagingTask extends DataStagingTask 
{
         }
     }
 
+    private boolean deleteSourceFiles(List<String> filePaths, AgentAdaptor 
adaptor) {
+        boolean allSucceeded = true;
+        for (String filePath : filePaths) {
+            if (filePath == null || filePath.trim().isEmpty()) {
+                continue;
+            }
+            try {
+                String escapedPath = filePath.replace("'", "'\"'\"'");
+                String deleteCommand = "rm -f '" + escapedPath + "'";
+
+                logger.debug("Deleting source file: {}", filePath);
+                CommandOutput deleteOutput = 
adaptor.executeCommand(deleteCommand, null);
+
+                if (deleteOutput.getExitCode() != 0) {
+                    logger.warn(
+                            "Failed to delete source file {} (exit code: {}). 
Stdout: {}, Stderr: {}",
+                            filePath,
+                            deleteOutput.getExitCode(),
+                            deleteOutput.getStdOut(),
+                            deleteOutput.getStdError());
+                    allSucceeded = false;
+                } else {
+                    logger.debug("Successfully deleted source file: {}", 
filePath);
+                }
+            } catch (AgentException e) {
+                logger.warn("Exception while deleting source file {}: {}", 
filePath, e.getMessage(), e);
+                allSucceeded = false;
+            } catch (Exception e) {
+                logger.warn("Unexpected error while deleting source file {}: 
{}", filePath, e.getMessage(), e);
+                allSucceeded = false;
+            }
+        }
+        return allSucceeded;
+    }
+
+    private void deleteEmptyDirectoryIfNeeded(String directoryPath, 
AgentAdaptor adaptor) {
+        if (directoryPath == null || directoryPath.trim().isEmpty()) {
+            return;
+        }
+
+        try {
+            List<String> directoryContents = 
adaptor.listDirectory(directoryPath);
+            if (directoryContents == null || directoryContents.isEmpty()) {
+                String escapedPath = directoryPath.replace("'", "'\"'\"'");
+                String rmdirCommand = "rmdir '" + escapedPath + "'";
+
+                logger.debug("Removing empty directory: {}", directoryPath);
+                CommandOutput rmdirOutput = 
adaptor.executeCommand(rmdirCommand, null);
+
+                if (rmdirOutput.getExitCode() != 0) {
+                    logger.debug(
+                            "Could not remove directory {} (may not be empty 
or may have been removed already). Exit code: {}, Stderr: {}",
+                            directoryPath,
+                            rmdirOutput.getExitCode(),
+                            rmdirOutput.getStdError());
+                } else {
+                    logger.debug("Successfully removed empty directory: {}", 
directoryPath);
+                }
+            } else {
+                logger.debug(
+                        "Directory {} is not empty (contains {} items), 
skipping removal",
+                        directoryPath,
+                        directoryContents.size());
+            }
+        } catch (AgentException e) {
+            logger.debug("Could not check or remove directory {}: {}", 
directoryPath, e.getMessage());
+
+        } catch (Exception e) {
+            logger.debug("Unexpected error while checking directory {}: {}", 
directoryPath, e.getMessage());
+        }
+    }
+
     @Override
     public void onCancel(TaskContext taskContext) {}
 }
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/ApplicationInterfaceEntity.java
 
b/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/ApplicationInterfaceEntity.java
index 2349e7ebe5..cdfe81927e 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/ApplicationInterfaceEntity.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/ApplicationInterfaceEntity.java
@@ -58,6 +58,9 @@ public class ApplicationInterfaceEntity implements 
Serializable {
     @Column(name = "HAS_OPTIONAL_FILE_INPUTS")
     private boolean hasOptionalFileInputs;
 
+    @Column(name = "CLEAN_AFTER_STAGED")
+    private boolean cleanAfterStaged;
+
     @ElementCollection(fetch = FetchType.EAGER)
     @CollectionTable(name = "APP_MODULE_MAPPING", joinColumns = 
@JoinColumn(name = "INTERFACE_ID"))
     @Column(name = "MODULE_ID")
@@ -145,6 +148,14 @@ public class ApplicationInterfaceEntity implements 
Serializable {
         this.hasOptionalFileInputs = hasOptionalFileInputs;
     }
 
+    public boolean getCleanAfterStaged() {
+        return cleanAfterStaged;
+    }
+
+    public void setCleanAfterStaged(boolean cleanAfterStaged) {
+        this.cleanAfterStaged = cleanAfterStaged;
+    }
+
     public List<String> getApplicationModules() {
         return applicationModules;
     }
diff --git 
a/airavata-api/src/main/resources/database_scripts/appcatalog-mysql.sql 
b/airavata-api/src/main/resources/database_scripts/appcatalog-mysql.sql
index 0d960e6541..1cb0c6b2c0 100644
--- a/airavata-api/src/main/resources/database_scripts/appcatalog-mysql.sql
+++ b/airavata-api/src/main/resources/database_scripts/appcatalog-mysql.sql
@@ -291,6 +291,7 @@ CREATE TABLE APPLICATION_INTERFACE
          GATEWAY_ID VARCHAR(255) NOT NULL,
          ARCHIVE_WORKING_DIRECTORY SMALLINT,
          HAS_OPTIONAL_FILE_INPUTS TINYINT(1),
+         CLEAN_AFTER_STAGED SMALLINT DEFAULT 0,
          CREATION_TIME TIMESTAMP DEFAULT NOW() NOT NULL,
          UPDATE_TIME TIMESTAMP DEFAULT '0000-00-00 00:00:00' NOT NULL,
          PRIMARY KEY(INTERFACE_ID)
diff --git 
a/dev-tools/airavata-python-sdk/airavata/model/appcatalog/appinterface/ttypes.py
 
b/dev-tools/airavata-python-sdk/airavata/model/appcatalog/appinterface/ttypes.py
index 6d57f492f3..bab92033fe 100644
--- 
a/dev-tools/airavata-python-sdk/airavata/model/appcatalog/appinterface/ttypes.py
+++ 
b/dev-tools/airavata-python-sdk/airavata/model/appcatalog/appinterface/ttypes.py
@@ -45,12 +45,13 @@ class ApplicationInterfaceDescription(object):
      - applicationOutputs
      - archiveWorkingDirectory
      - hasOptionalFileInputs
+     - cleanAfterStaged
 
     """
     thrift_spec: typing.Any = None
 
 
-    def __init__(self, applicationInterfaceId: str = "DO_NOT_SET_AT_CLIENTS", 
applicationName: str = None, applicationDescription: typing.Optional[str] = 
None, applicationModules: typing.Optional[list[str]] = None, applicationInputs: 
typing.Optional[list[airavata.model.application.io.ttypes.InputDataObjectType]] 
= None, applicationOutputs: 
typing.Optional[list[airavata.model.application.io.ttypes.OutputDataObjectType]]
 = None, archiveWorkingDirectory: typing.Optional[bool] = False, hasOp [...]
+    def __init__(self, applicationInterfaceId: str = "DO_NOT_SET_AT_CLIENTS", 
applicationName: str = None, applicationDescription: typing.Optional[str] = 
None, applicationModules: typing.Optional[list[str]] = None, applicationInputs: 
typing.Optional[list[airavata.model.application.io.ttypes.InputDataObjectType]] 
= None, applicationOutputs: 
typing.Optional[list[airavata.model.application.io.ttypes.OutputDataObjectType]]
 = None, archiveWorkingDirectory: typing.Optional[bool] = False, hasOp [...]
         self.applicationInterfaceId: str = applicationInterfaceId
         self.applicationName: str = applicationName
         self.applicationDescription: typing.Optional[str] = 
applicationDescription
@@ -59,6 +60,7 @@ class ApplicationInterfaceDescription(object):
         self.applicationOutputs: 
typing.Optional[list[airavata.model.application.io.ttypes.OutputDataObjectType]]
 = applicationOutputs
         self.archiveWorkingDirectory: typing.Optional[bool] = 
archiveWorkingDirectory
         self.hasOptionalFileInputs: typing.Optional[bool] = 
hasOptionalFileInputs
+        self.cleanAfterStaged: typing.Optional[bool] = cleanAfterStaged
 
     def read(self, iprot):
         if iprot._fast_decode is not None and isinstance(iprot.trans, 
TTransport.CReadableTransport) and self.thrift_spec is not None:
@@ -126,6 +128,11 @@ class ApplicationInterfaceDescription(object):
                     self.hasOptionalFileInputs = iprot.readBool()
                 else:
                     iprot.skip(ftype)
+            elif fid == 9:
+                if ftype == TType.BOOL:
+                    self.cleanAfterStaged = iprot.readBool()
+                else:
+                    iprot.skip(ftype)
             else:
                 iprot.skip(ftype)
             iprot.readFieldEnd()
@@ -178,6 +185,10 @@ class ApplicationInterfaceDescription(object):
             oprot.writeFieldBegin('hasOptionalFileInputs', TType.BOOL, 8)
             oprot.writeBool(self.hasOptionalFileInputs)
             oprot.writeFieldEnd()
+        if self.cleanAfterStaged is not None:
+            oprot.writeFieldBegin('cleanAfterStaged', TType.BOOL, 9)
+            oprot.writeBool(self.cleanAfterStaged)
+            oprot.writeFieldEnd()
         oprot.writeFieldStop()
         oprot.writeStructEnd()
 
@@ -209,6 +220,7 @@ ApplicationInterfaceDescription.thrift_spec = (
     (6, TType.LIST, 'applicationOutputs', (TType.STRUCT, 
[airavata.model.application.io.ttypes.OutputDataObjectType, None], False), 
None, ),  # 6
     (7, TType.BOOL, 'archiveWorkingDirectory', None, False, ),  # 7
     (8, TType.BOOL, 'hasOptionalFileInputs', None, None, ),  # 8
+    (9, TType.BOOL, 'cleanAfterStaged', None, False, ),  # 9
 )
 fix_spec(all_structs)
 del all_structs
diff --git 
a/thrift-interface-descriptions/data-models/application_interface_model.thrift 
b/thrift-interface-descriptions/data-models/application_interface_model.thrift
index a8b4c966ec..88ea6cb386 100644
--- 
a/thrift-interface-descriptions/data-models/application_interface_model.thrift
+++ 
b/thrift-interface-descriptions/data-models/application_interface_model.thrift
@@ -55,5 +55,6 @@ struct ApplicationInterfaceDescription {
     5: optional list<application_io_models.InputDataObjectType> 
applicationInputs,
     6: optional list<application_io_models.OutputDataObjectType> 
applicationOutputs,
     7: optional bool archiveWorkingDirectory = 0,
-    8: optional bool hasOptionalFileInputs
+    8: optional bool hasOptionalFileInputs,
+    9: optional bool cleanAfterStaged = 0
 }
\ No newline at end of file

Reply via email to