This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/master by this push:
new 8281d36a5b Add cleanAfterStaged flag to delete source files after
successful staging
8281d36a5b is described below
commit 8281d36a5b1e483ed274d274caff61f903083768
Author: lahiruj <[email protected]>
AuthorDate: Thu Nov 27 00:00:14 2025 -0500
Add cleanAfterStaged flag to delete source files after successful staging
---
.../init/03-appcatalog-migrations.sql | 3 +
.../impl/task/staging/OutputDataStagingTask.java | 136 ++++++++++++++++++++-
.../appcatalog/ApplicationInterfaceEntity.java | 11 ++
.../database_scripts/appcatalog-mysql.sql | 1 +
.../model/appcatalog/appinterface/ttypes.py | 14 ++-
.../data-models/application_interface_model.thrift | 3 +-
6 files changed, 161 insertions(+), 7 deletions(-)
diff --git a/.devcontainer/database_scripts/init/03-appcatalog-migrations.sql
b/.devcontainer/database_scripts/init/03-appcatalog-migrations.sql
index e3a1c3358d..e7ef4a55c8 100644
--- a/.devcontainer/database_scripts/init/03-appcatalog-migrations.sql
+++ b/.devcontainer/database_scripts/init/03-appcatalog-migrations.sql
@@ -17,3 +17,6 @@ ALTER TABLE COMPUTE_RESOURCE_RESERVATION ADD CONSTRAINT
FK_COMPUTE_RESOURCE_RESE
-- AIRAVATA-3369: Convert USER_FRIENDLY_DESC from VARCHAR to TEXT (CLOB)
alter table APPLICATION_INPUT modify column USER_FRIENDLY_DESC TEXT;
+
+-- Add cleanAfterStaged flag to APPLICATION_INTERFACE
+ALTER TABLE APPLICATION_INTERFACE ADD COLUMN IF NOT EXISTS CLEAN_AFTER_STAGED
SMALLINT DEFAULT 0;
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
index 8a80da1f4b..93d683e570 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
@@ -27,11 +27,13 @@ import java.util.List;
import java.util.stream.Collectors;
import org.apache.airavata.agents.api.AgentAdaptor;
import org.apache.airavata.agents.api.AgentException;
+import org.apache.airavata.agents.api.CommandOutput;
import org.apache.airavata.agents.api.StorageResourceAdaptor;
import org.apache.airavata.helix.impl.task.TaskContext;
import org.apache.airavata.helix.impl.task.TaskOnFailException;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import
org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
import
org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
import org.apache.airavata.model.application.io.DataType;
@@ -127,7 +129,8 @@ public class OutputDataStagingTask extends DataStagingTask {
// Fetch and validate compute resource adaptor
AgentAdaptor adaptor =
getComputeResourceAdaptor(taskHelper.getAdaptorSupport());
- List<URI> destinationURIs = new ArrayList<URI>();
+ List<URI> destinationURIs = new ArrayList<>();
+ List<String> successfullyTransferredSourcePaths = new
ArrayList<>();
if (sourceFileName.contains("*")) {
// if file is declared as a wild card
@@ -180,15 +183,16 @@ public class OutputDataStagingTask extends
DataStagingTask {
storageResourceAdaptor);
if (transferred) {
destinationURIs.add(destinationURI);
+
successfullyTransferredSourcePaths.add(newSourceURI.getPath());
} else {
- logger.warn("File " + sourceFileName + " did not
transfer");
+ logger.warn("File {} did not transfer",
sourceFileName);
}
if (processOutput.getType() == DataType.URI) {
if (filePaths.size() > 1) {
logger.warn(
- "More than one file matched wildcard, but
output type is URI. Skipping remaining matches: "
- + filePaths.subList(1,
filePaths.size()));
+ "More than one file matched wildcard, but
output type is URI. Skipping remaining matches: {}",
+ filePaths.subList(1, filePaths.size()));
}
break;
}
@@ -206,6 +210,31 @@ public class OutputDataStagingTask extends DataStagingTask
{
.map(this::escapeSpecialCharacters)
.collect(Collectors.toList()));
}
+
+ try {
+ ApplicationInterfaceDescription appInterface =
+
getTaskContext().getApplicationInterfaceDescription();
+ if (appInterface != null &&
appInterface.isCleanAfterStaged()) {
+ logger.info(
+ "cleanAfterStaged is enabled, deleting
source files after successful staging for task with the Id: {}",
+ getTaskId());
+ // Delete only successfully transferred source
files
+ boolean allDeleted =
deleteSourceFiles(successfullyTransferredSourcePaths, adaptor);
+ if (!allDeleted) {
+ logger.warn(
+ "Some source files could not be
deleted after staging for task {}.",
+ getTaskId());
+ } else if
(!successfullyTransferredSourcePaths.isEmpty()) {
+ logger.info(
+ "Successfully deleted all {} source
file(s) after staging for task {}",
+
successfullyTransferredSourcePaths.size(),
+ getTaskId());
+ }
+ deleteEmptyDirectoryIfNeeded(sourceParentPath,
adaptor);
+ }
+ } catch (Exception e) {
+ logger.warn("Failed to clean up source files after
staging for task {}", getTaskId(), e);
+ }
}
return onSuccess("Output data staging task " + getTaskId() + "
successfully completed");
@@ -216,8 +245,33 @@ public class OutputDataStagingTask extends DataStagingTask
{
sourceURI.getPath(), destinationURI.getPath(),
sourceFileName, adaptor, storageResourceAdaptor);
if (transferred) {
saveExperimentOutput(processOutput.getName(),
escapeSpecialCharacters(destinationURI.toString()));
+
+ try {
+ ApplicationInterfaceDescription appInterface =
+
getTaskContext().getApplicationInterfaceDescription();
+ if (appInterface != null &&
appInterface.isCleanAfterStaged()) {
+ logger.info(
+ "cleanAfterStaged is enabled, deleting
source file after successful staging for task with the Id: {}",
+ getTaskId());
+ boolean deleted =
deleteSourceFiles(List.of(sourceURI.getPath()), adaptor);
+ if (!deleted) {
+ logger.warn("Source file could not be deleted
after staging for task {}.", getTaskId());
+ } else {
+ logger.info("Successfully deleted source file
after staging for task {}", getTaskId());
+ }
+ String sourceParentPath = (new
File(sourceURI.getPath()))
+ .getParentFile()
+ .getPath();
+ deleteEmptyDirectoryIfNeeded(sourceParentPath,
adaptor);
+ }
+ } catch (Exception e) {
+ logger.warn(
+ "Failed to clean up source file after staging
for task {}. Staging completed successfully.",
+ getTaskId(),
+ e);
+ }
} else {
- logger.warn("File " + sourceFileName + " did not
transfer");
+ logger.warn("File {} did not transfer", sourceFileName);
}
return onSuccess("Output data staging task " + getTaskId() + "
successfully completed");
}
@@ -236,6 +290,78 @@ public class OutputDataStagingTask extends DataStagingTask
{
}
}
+ private boolean deleteSourceFiles(List<String> filePaths, AgentAdaptor
adaptor) {
+ boolean allSucceeded = true;
+ for (String filePath : filePaths) {
+ if (filePath == null || filePath.trim().isEmpty()) {
+ continue;
+ }
+ try {
+ String escapedPath = filePath.replace("'", "'\"'\"'");
+ String deleteCommand = "rm -f '" + escapedPath + "'";
+
+ logger.debug("Deleting source file: {}", filePath);
+ CommandOutput deleteOutput =
adaptor.executeCommand(deleteCommand, null);
+
+ if (deleteOutput.getExitCode() != 0) {
+ logger.warn(
+ "Failed to delete source file {} (exit code: {}).
Stdout: {}, Stderr: {}",
+ filePath,
+ deleteOutput.getExitCode(),
+ deleteOutput.getStdOut(),
+ deleteOutput.getStdError());
+ allSucceeded = false;
+ } else {
+ logger.debug("Successfully deleted source file: {}",
filePath);
+ }
+ } catch (AgentException e) {
+ logger.warn("Exception while deleting source file {}: {}",
filePath, e.getMessage(), e);
+ allSucceeded = false;
+ } catch (Exception e) {
+ logger.warn("Unexpected error while deleting source file {}:
{}", filePath, e.getMessage(), e);
+ allSucceeded = false;
+ }
+ }
+ return allSucceeded;
+ }
+
+ private void deleteEmptyDirectoryIfNeeded(String directoryPath,
AgentAdaptor adaptor) {
+ if (directoryPath == null || directoryPath.trim().isEmpty()) {
+ return;
+ }
+
+ try {
+ List<String> directoryContents =
adaptor.listDirectory(directoryPath);
+ if (directoryContents == null || directoryContents.isEmpty()) {
+ String escapedPath = directoryPath.replace("'", "'\"'\"'");
+ String rmdirCommand = "rmdir '" + escapedPath + "'";
+
+ logger.debug("Removing empty directory: {}", directoryPath);
+ CommandOutput rmdirOutput =
adaptor.executeCommand(rmdirCommand, null);
+
+ if (rmdirOutput.getExitCode() != 0) {
+ logger.debug(
+ "Could not remove directory {} (may not be empty
or may have been removed already). Exit code: {}, Stderr: {}",
+ directoryPath,
+ rmdirOutput.getExitCode(),
+ rmdirOutput.getStdError());
+ } else {
+ logger.debug("Successfully removed empty directory: {}",
directoryPath);
+ }
+ } else {
+ logger.debug(
+ "Directory {} is not empty (contains {} items),
skipping removal",
+ directoryPath,
+ directoryContents.size());
+ }
+ } catch (AgentException e) {
+ logger.debug("Could not check or remove directory {}: {}",
directoryPath, e.getMessage());
+
+ } catch (Exception e) {
+ logger.debug("Unexpected error while checking directory {}: {}",
directoryPath, e.getMessage());
+ }
+ }
+
@Override
public void onCancel(TaskContext taskContext) {}
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/ApplicationInterfaceEntity.java
b/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/ApplicationInterfaceEntity.java
index 2349e7ebe5..cdfe81927e 100644
---
a/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/ApplicationInterfaceEntity.java
+++
b/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/ApplicationInterfaceEntity.java
@@ -58,6 +58,9 @@ public class ApplicationInterfaceEntity implements
Serializable {
@Column(name = "HAS_OPTIONAL_FILE_INPUTS")
private boolean hasOptionalFileInputs;
+ @Column(name = "CLEAN_AFTER_STAGED")
+ private boolean cleanAfterStaged;
+
@ElementCollection(fetch = FetchType.EAGER)
@CollectionTable(name = "APP_MODULE_MAPPING", joinColumns =
@JoinColumn(name = "INTERFACE_ID"))
@Column(name = "MODULE_ID")
@@ -145,6 +148,14 @@ public class ApplicationInterfaceEntity implements
Serializable {
this.hasOptionalFileInputs = hasOptionalFileInputs;
}
+ public boolean getCleanAfterStaged() {
+ return cleanAfterStaged;
+ }
+
+ public void setCleanAfterStaged(boolean cleanAfterStaged) {
+ this.cleanAfterStaged = cleanAfterStaged;
+ }
+
public List<String> getApplicationModules() {
return applicationModules;
}
diff --git
a/airavata-api/src/main/resources/database_scripts/appcatalog-mysql.sql
b/airavata-api/src/main/resources/database_scripts/appcatalog-mysql.sql
index 0d960e6541..1cb0c6b2c0 100644
--- a/airavata-api/src/main/resources/database_scripts/appcatalog-mysql.sql
+++ b/airavata-api/src/main/resources/database_scripts/appcatalog-mysql.sql
@@ -291,6 +291,7 @@ CREATE TABLE APPLICATION_INTERFACE
GATEWAY_ID VARCHAR(255) NOT NULL,
ARCHIVE_WORKING_DIRECTORY SMALLINT,
HAS_OPTIONAL_FILE_INPUTS TINYINT(1),
+ CLEAN_AFTER_STAGED SMALLINT DEFAULT 0,
CREATION_TIME TIMESTAMP DEFAULT NOW() NOT NULL,
UPDATE_TIME TIMESTAMP DEFAULT '0000-00-00 00:00:00' NOT NULL,
PRIMARY KEY(INTERFACE_ID)
diff --git
a/dev-tools/airavata-python-sdk/airavata/model/appcatalog/appinterface/ttypes.py
b/dev-tools/airavata-python-sdk/airavata/model/appcatalog/appinterface/ttypes.py
index 6d57f492f3..bab92033fe 100644
---
a/dev-tools/airavata-python-sdk/airavata/model/appcatalog/appinterface/ttypes.py
+++
b/dev-tools/airavata-python-sdk/airavata/model/appcatalog/appinterface/ttypes.py
@@ -45,12 +45,13 @@ class ApplicationInterfaceDescription(object):
- applicationOutputs
- archiveWorkingDirectory
- hasOptionalFileInputs
+ - cleanAfterStaged
"""
thrift_spec: typing.Any = None
- def __init__(self, applicationInterfaceId: str = "DO_NOT_SET_AT_CLIENTS",
applicationName: str = None, applicationDescription: typing.Optional[str] =
None, applicationModules: typing.Optional[list[str]] = None, applicationInputs:
typing.Optional[list[airavata.model.application.io.ttypes.InputDataObjectType]]
= None, applicationOutputs:
typing.Optional[list[airavata.model.application.io.ttypes.OutputDataObjectType]]
= None, archiveWorkingDirectory: typing.Optional[bool] = False, hasOp [...]
+ def __init__(self, applicationInterfaceId: str = "DO_NOT_SET_AT_CLIENTS",
applicationName: str = None, applicationDescription: typing.Optional[str] =
None, applicationModules: typing.Optional[list[str]] = None, applicationInputs:
typing.Optional[list[airavata.model.application.io.ttypes.InputDataObjectType]]
= None, applicationOutputs:
typing.Optional[list[airavata.model.application.io.ttypes.OutputDataObjectType]]
= None, archiveWorkingDirectory: typing.Optional[bool] = False, hasOp [...]
self.applicationInterfaceId: str = applicationInterfaceId
self.applicationName: str = applicationName
self.applicationDescription: typing.Optional[str] =
applicationDescription
@@ -59,6 +60,7 @@ class ApplicationInterfaceDescription(object):
self.applicationOutputs:
typing.Optional[list[airavata.model.application.io.ttypes.OutputDataObjectType]]
= applicationOutputs
self.archiveWorkingDirectory: typing.Optional[bool] =
archiveWorkingDirectory
self.hasOptionalFileInputs: typing.Optional[bool] =
hasOptionalFileInputs
+ self.cleanAfterStaged: typing.Optional[bool] = cleanAfterStaged
def read(self, iprot):
if iprot._fast_decode is not None and isinstance(iprot.trans,
TTransport.CReadableTransport) and self.thrift_spec is not None:
@@ -126,6 +128,11 @@ class ApplicationInterfaceDescription(object):
self.hasOptionalFileInputs = iprot.readBool()
else:
iprot.skip(ftype)
+ elif fid == 9:
+ if ftype == TType.BOOL:
+ self.cleanAfterStaged = iprot.readBool()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -178,6 +185,10 @@ class ApplicationInterfaceDescription(object):
oprot.writeFieldBegin('hasOptionalFileInputs', TType.BOOL, 8)
oprot.writeBool(self.hasOptionalFileInputs)
oprot.writeFieldEnd()
+ if self.cleanAfterStaged is not None:
+ oprot.writeFieldBegin('cleanAfterStaged', TType.BOOL, 9)
+ oprot.writeBool(self.cleanAfterStaged)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -209,6 +220,7 @@ ApplicationInterfaceDescription.thrift_spec = (
(6, TType.LIST, 'applicationOutputs', (TType.STRUCT,
[airavata.model.application.io.ttypes.OutputDataObjectType, None], False),
None, ), # 6
(7, TType.BOOL, 'archiveWorkingDirectory', None, False, ), # 7
(8, TType.BOOL, 'hasOptionalFileInputs', None, None, ), # 8
+ (9, TType.BOOL, 'cleanAfterStaged', None, False, ), # 9
)
fix_spec(all_structs)
del all_structs
diff --git
a/thrift-interface-descriptions/data-models/application_interface_model.thrift
b/thrift-interface-descriptions/data-models/application_interface_model.thrift
index a8b4c966ec..88ea6cb386 100644
---
a/thrift-interface-descriptions/data-models/application_interface_model.thrift
+++
b/thrift-interface-descriptions/data-models/application_interface_model.thrift
@@ -55,5 +55,6 @@ struct ApplicationInterfaceDescription {
5: optional list<application_io_models.InputDataObjectType>
applicationInputs,
6: optional list<application_io_models.OutputDataObjectType>
applicationOutputs,
7: optional bool archiveWorkingDirectory = 0,
- 8: optional bool hasOptionalFileInputs
+ 8: optional bool hasOptionalFileInputs,
+ 9: optional bool cleanAfterStaged = 0
}
\ No newline at end of file