This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch data-cleanup in repository https://gitbox.apache.org/repos/asf/airavata.git
commit f0d9dff86345d44a4a765ae4f050387e9e34f0e1 Author: lahiruj <[email protected]> AuthorDate: Thu Nov 27 00:00:14 2025 -0500 Add cleanAfterStaged flag to delete source files after successful staging --- .../init/03-appcatalog-migrations.sql | 3 + .../impl/task/staging/OutputDataStagingTask.java | 136 ++++++++++++++++++++- .../appcatalog/ApplicationInterfaceEntity.java | 11 ++ .../database_scripts/appcatalog-mysql.sql | 1 + .../model/appcatalog/appinterface/ttypes.py | 14 ++- .../data-models/application_interface_model.thrift | 3 +- 6 files changed, 161 insertions(+), 7 deletions(-) diff --git a/.devcontainer/database_scripts/init/03-appcatalog-migrations.sql b/.devcontainer/database_scripts/init/03-appcatalog-migrations.sql index e3a1c3358d..e7ef4a55c8 100644 --- a/.devcontainer/database_scripts/init/03-appcatalog-migrations.sql +++ b/.devcontainer/database_scripts/init/03-appcatalog-migrations.sql @@ -17,3 +17,6 @@ ALTER TABLE COMPUTE_RESOURCE_RESERVATION ADD CONSTRAINT FK_COMPUTE_RESOURCE_RESE -- AIRAVATA-3369: Convert USER_FRIENDLY_DESC from VARCHAR to TEXT (CLOB) alter table APPLICATION_INPUT modify column USER_FRIENDLY_DESC TEXT; + +-- Add cleanAfterStaged flag to APPLICATION_INTERFACE +ALTER TABLE APPLICATION_INTERFACE ADD COLUMN IF NOT EXISTS CLEAN_AFTER_STAGED SMALLINT DEFAULT 0; diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java index 8a80da1f4b..93d683e570 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java @@ -27,11 +27,13 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.airavata.agents.api.AgentAdaptor; import org.apache.airavata.agents.api.AgentException; +import org.apache.airavata.agents.api.CommandOutput; import org.apache.airavata.agents.api.StorageResourceAdaptor; import org.apache.airavata.helix.impl.task.TaskContext; import org.apache.airavata.helix.impl.task.TaskOnFailException; import org.apache.airavata.helix.task.api.TaskHelper; import org.apache.airavata.helix.task.api.annotation.TaskDef; +import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference; import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription; import org.apache.airavata.model.application.io.DataType; @@ -127,7 +129,8 @@ public class OutputDataStagingTask extends DataStagingTask { // Fetch and validate compute resource adaptor AgentAdaptor adaptor = getComputeResourceAdaptor(taskHelper.getAdaptorSupport()); - List<URI> destinationURIs = new ArrayList<URI>(); + List<URI> destinationURIs = new ArrayList<>(); + List<String> successfullyTransferredSourcePaths = new ArrayList<>(); if (sourceFileName.contains("*")) { // if file is declared as a wild card @@ -180,15 +183,16 @@ public class OutputDataStagingTask extends DataStagingTask { storageResourceAdaptor); if (transferred) { destinationURIs.add(destinationURI); + successfullyTransferredSourcePaths.add(newSourceURI.getPath()); } else { - logger.warn("File " + sourceFileName + " did not transfer"); + logger.warn("File {} did not transfer", sourceFileName); } if (processOutput.getType() == DataType.URI) { if (filePaths.size() > 1) { logger.warn( - "More than one file matched wildcard, but output type is URI. Skipping remaining matches: " - + filePaths.subList(1, filePaths.size())); + "More than one file matched wildcard, but output type is URI. Skipping remaining matches: {}", + filePaths.subList(1, filePaths.size())); } break; } @@ -206,6 +210,31 @@ public class OutputDataStagingTask extends DataStagingTask { .map(this::escapeSpecialCharacters) .collect(Collectors.toList())); } + + try { + ApplicationInterfaceDescription appInterface = + getTaskContext().getApplicationInterfaceDescription(); + if (appInterface != null && appInterface.isCleanAfterStaged()) { + logger.info( + "cleanAfterStaged is enabled, deleting source files after successful staging for task with the Id: {}", + getTaskId()); + // Delete only successfully transferred source files + boolean allDeleted = deleteSourceFiles(successfullyTransferredSourcePaths, adaptor); + if (!allDeleted) { + logger.warn( + "Some source files could not be deleted after staging for task {}.", + getTaskId()); + } else if (!successfullyTransferredSourcePaths.isEmpty()) { + logger.info( + "Successfully deleted all {} source file(s) after staging for task {}", + successfullyTransferredSourcePaths.size(), + getTaskId()); + } + deleteEmptyDirectoryIfNeeded(sourceParentPath, adaptor); + } + } catch (Exception e) { + logger.warn("Failed to clean up source files after staging for task {}", getTaskId(), e); + } } return onSuccess("Output data staging task " + getTaskId() + " successfully completed"); @@ -216,8 +245,33 @@ public class OutputDataStagingTask extends DataStagingTask { sourceURI.getPath(), destinationURI.getPath(), sourceFileName, adaptor, storageResourceAdaptor); if (transferred) { saveExperimentOutput(processOutput.getName(), escapeSpecialCharacters(destinationURI.toString())); + + try { + ApplicationInterfaceDescription appInterface = + getTaskContext().getApplicationInterfaceDescription(); + if (appInterface != null && appInterface.isCleanAfterStaged()) { + logger.info( + "cleanAfterStaged is enabled, deleting source file after successful staging for task with the Id: {}", + getTaskId()); + boolean deleted = deleteSourceFiles(List.of(sourceURI.getPath()), adaptor); + if (!deleted) { + logger.warn("Source file could not be deleted after staging for task {}.", getTaskId()); + } else { + logger.info("Successfully deleted source file after staging for task {}", getTaskId()); + } + String sourceParentPath = (new File(sourceURI.getPath())) + .getParentFile() + .getPath(); + deleteEmptyDirectoryIfNeeded(sourceParentPath, adaptor); + } + } catch (Exception e) { + logger.warn( + "Failed to clean up source file after staging for task {}. Staging completed successfully.", + getTaskId(), + e); + } } else { - logger.warn("File " + sourceFileName + " did not transfer"); + logger.warn("File {} did not transfer", sourceFileName); } return onSuccess("Output data staging task " + getTaskId() + " successfully completed"); } @@ -236,6 +290,78 @@ public class OutputDataStagingTask extends DataStagingTask { } } + private boolean deleteSourceFiles(List<String> filePaths, AgentAdaptor adaptor) { + boolean allSucceeded = true; + for (String filePath : filePaths) { + if (filePath == null || filePath.trim().isEmpty()) { + continue; + } + try { + String escapedPath = filePath.replace("'", "'\"'\"'"); + String deleteCommand = "rm -f '" + escapedPath + "'"; + + logger.debug("Deleting source file: {}", filePath); + CommandOutput deleteOutput = adaptor.executeCommand(deleteCommand, null); + + if (deleteOutput.getExitCode() != 0) { + logger.warn( + "Failed to delete source file {} (exit code: {}). Stdout: {}, Stderr: {}", + filePath, + deleteOutput.getExitCode(), + deleteOutput.getStdOut(), + deleteOutput.getStdError()); + allSucceeded = false; + } else { + logger.debug("Successfully deleted source file: {}", filePath); + } + } catch (AgentException e) { + logger.warn("Exception while deleting source file {}: {}", filePath, e.getMessage(), e); + allSucceeded = false; + } catch (Exception e) { + logger.warn("Unexpected error while deleting source file {}: {}", filePath, e.getMessage(), e); + allSucceeded = false; + } + } + return allSucceeded; + } + + private void deleteEmptyDirectoryIfNeeded(String directoryPath, AgentAdaptor adaptor) { + if (directoryPath == null || directoryPath.trim().isEmpty()) { + return; + } + + try { + List<String> directoryContents = adaptor.listDirectory(directoryPath); + if (directoryContents == null || directoryContents.isEmpty()) { + String escapedPath = directoryPath.replace("'", "'\"'\"'"); + String rmdirCommand = "rmdir '" + escapedPath + "'"; + + logger.debug("Removing empty directory: {}", directoryPath); + CommandOutput rmdirOutput = adaptor.executeCommand(rmdirCommand, null); + + if (rmdirOutput.getExitCode() != 0) { + logger.debug( + "Could not remove directory {} (may not be empty or may have been removed already). Exit code: {}, Stderr: {}", + directoryPath, + rmdirOutput.getExitCode(), + rmdirOutput.getStdError()); + } else { + logger.debug("Successfully removed empty directory: {}", directoryPath); + } + } else { + logger.debug( + "Directory {} is not empty (contains {} items), skipping removal", + directoryPath, + directoryContents.size()); + } + } catch (AgentException e) { + logger.debug("Could not check or remove directory {}: {}", directoryPath, e.getMessage()); + + } catch (Exception e) { + logger.debug("Unexpected error while checking directory {}: {}", directoryPath, e.getMessage()); + } + } + @Override public void onCancel(TaskContext taskContext) {} } diff --git a/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/ApplicationInterfaceEntity.java b/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/ApplicationInterfaceEntity.java index 2349e7ebe5..cdfe81927e 100644 --- a/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/ApplicationInterfaceEntity.java +++ b/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/ApplicationInterfaceEntity.java @@ -58,6 +58,9 @@ public class ApplicationInterfaceEntity implements Serializable { @Column(name = "HAS_OPTIONAL_FILE_INPUTS") private boolean hasOptionalFileInputs; + @Column(name = "CLEAN_AFTER_STAGED") + private boolean cleanAfterStaged; + @ElementCollection(fetch = FetchType.EAGER) @CollectionTable(name = "APP_MODULE_MAPPING", joinColumns = @JoinColumn(name = "INTERFACE_ID")) @Column(name = "MODULE_ID") @@ -145,6 +148,14 @@ public class ApplicationInterfaceEntity implements Serializable { this.hasOptionalFileInputs = hasOptionalFileInputs; } + public boolean getCleanAfterStaged() { + return cleanAfterStaged; + } + + public void setCleanAfterStaged(boolean cleanAfterStaged) { + this.cleanAfterStaged = cleanAfterStaged; + } + public List<String> getApplicationModules() { return applicationModules; } diff --git a/airavata-api/src/main/resources/database_scripts/appcatalog-mysql.sql b/airavata-api/src/main/resources/database_scripts/appcatalog-mysql.sql index 0d960e6541..1cb0c6b2c0 100644 --- a/airavata-api/src/main/resources/database_scripts/appcatalog-mysql.sql +++ b/airavata-api/src/main/resources/database_scripts/appcatalog-mysql.sql @@ -291,6 +291,7 @@ CREATE TABLE APPLICATION_INTERFACE GATEWAY_ID VARCHAR(255) NOT NULL, ARCHIVE_WORKING_DIRECTORY SMALLINT, HAS_OPTIONAL_FILE_INPUTS TINYINT(1), + CLEAN_AFTER_STAGED SMALLINT DEFAULT 0, CREATION_TIME TIMESTAMP DEFAULT NOW() NOT NULL, UPDATE_TIME TIMESTAMP DEFAULT '0000-00-00 00:00:00' NOT NULL, PRIMARY KEY(INTERFACE_ID) diff --git a/dev-tools/airavata-python-sdk/airavata/model/appcatalog/appinterface/ttypes.py b/dev-tools/airavata-python-sdk/airavata/model/appcatalog/appinterface/ttypes.py index 6d57f492f3..bab92033fe 100644 --- a/dev-tools/airavata-python-sdk/airavata/model/appcatalog/appinterface/ttypes.py +++ b/dev-tools/airavata-python-sdk/airavata/model/appcatalog/appinterface/ttypes.py @@ -45,12 +45,13 @@ class ApplicationInterfaceDescription(object): - applicationOutputs - archiveWorkingDirectory - hasOptionalFileInputs + - cleanAfterStaged """ thrift_spec: typing.Any = None - def __init__(self, applicationInterfaceId: str = "DO_NOT_SET_AT_CLIENTS", applicationName: str = None, applicationDescription: typing.Optional[str] = None, applicationModules: typing.Optional[list[str]] = None, applicationInputs: typing.Optional[list[airavata.model.application.io.ttypes.InputDataObjectType]] = None, applicationOutputs: typing.Optional[list[airavata.model.application.io.ttypes.OutputDataObjectType]] = None, archiveWorkingDirectory: typing.Optional[bool] = False, hasOp [...] + def __init__(self, applicationInterfaceId: str = "DO_NOT_SET_AT_CLIENTS", applicationName: str = None, applicationDescription: typing.Optional[str] = None, applicationModules: typing.Optional[list[str]] = None, applicationInputs: typing.Optional[list[airavata.model.application.io.ttypes.InputDataObjectType]] = None, applicationOutputs: typing.Optional[list[airavata.model.application.io.ttypes.OutputDataObjectType]] = None, archiveWorkingDirectory: typing.Optional[bool] = False, hasOp [...] self.applicationInterfaceId: str = applicationInterfaceId self.applicationName: str = applicationName self.applicationDescription: typing.Optional[str] = applicationDescription @@ -59,6 +60,7 @@ class ApplicationInterfaceDescription(object): self.applicationOutputs: typing.Optional[list[airavata.model.application.io.ttypes.OutputDataObjectType]] = applicationOutputs self.archiveWorkingDirectory: typing.Optional[bool] = archiveWorkingDirectory self.hasOptionalFileInputs: typing.Optional[bool] = hasOptionalFileInputs + self.cleanAfterStaged: typing.Optional[bool] = cleanAfterStaged def read(self, iprot): if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: @@ -126,6 +128,11 @@ class ApplicationInterfaceDescription(object): self.hasOptionalFileInputs = iprot.readBool() else: iprot.skip(ftype) + elif fid == 9: + if ftype == TType.BOOL: + self.cleanAfterStaged = iprot.readBool() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -178,6 +185,10 @@ class ApplicationInterfaceDescription(object): oprot.writeFieldBegin('hasOptionalFileInputs', TType.BOOL, 8) oprot.writeBool(self.hasOptionalFileInputs) oprot.writeFieldEnd() + if self.cleanAfterStaged is not None: + oprot.writeFieldBegin('cleanAfterStaged', TType.BOOL, 9) + oprot.writeBool(self.cleanAfterStaged) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -209,6 +220,7 @@ ApplicationInterfaceDescription.thrift_spec = ( (6, TType.LIST, 'applicationOutputs', (TType.STRUCT, [airavata.model.application.io.ttypes.OutputDataObjectType, None], False), None, ), # 6 (7, TType.BOOL, 'archiveWorkingDirectory', None, False, ), # 7 (8, TType.BOOL, 'hasOptionalFileInputs', None, None, ), # 8 + (9, TType.BOOL, 'cleanAfterStaged', None, False, ), # 9 ) fix_spec(all_structs) del all_structs diff --git a/thrift-interface-descriptions/data-models/application_interface_model.thrift b/thrift-interface-descriptions/data-models/application_interface_model.thrift index a8b4c966ec..88ea6cb386 100644 --- a/thrift-interface-descriptions/data-models/application_interface_model.thrift +++ b/thrift-interface-descriptions/data-models/application_interface_model.thrift @@ -55,5 +55,6 @@ struct ApplicationInterfaceDescription { 5: optional list<application_io_models.InputDataObjectType> applicationInputs, 6: optional list<application_io_models.OutputDataObjectType> applicationOutputs, 7: optional bool archiveWorkingDirectory = 0, - 8: optional bool hasOptionalFileInputs + 8: optional bool hasOptionalFileInputs, + 9: optional bool cleanAfterStaged = 0 } \ No newline at end of file
