This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch storage-update in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 927be611a6a2d5a8bd1a4274910e988b2365f660 Author: lahiruj <[email protected]> AuthorDate: Fri Oct 24 17:28:00 2025 -0400 Add support for an optional output storage resource for experiments --- .../init/04-expcatalog-migrations.sql | 6 +++- .../airavata/helix/impl/task/TaskContext.java | 21 ++++++++++++ .../helix/impl/task/staging/DataStagingTask.java | 37 ++++++++++++++++++++++ .../impl/task/staging/OutputDataStagingTask.java | 13 ++++---- .../airavata/model/util/ExperimentModelUtil.java | 1 + .../core/entities/expcatalog/ProcessEntity.java | 11 +++++++ .../expcatalog/UserConfigurationDataEntity.java | 11 +++++++ .../data-models/experiment_model.thrift | 1 + .../data-models/process_model.thrift | 1 + 9 files changed, 95 insertions(+), 7 deletions(-) diff --git a/.devcontainer/database_scripts/init/04-expcatalog-migrations.sql b/.devcontainer/database_scripts/init/04-expcatalog-migrations.sql index e3ebde0753..198f57d6d6 100644 --- a/.devcontainer/database_scripts/init/04-expcatalog-migrations.sql +++ b/.devcontainer/database_scripts/init/04-expcatalog-migrations.sql @@ -30,4 +30,8 @@ CREATE TABLE IF NOT EXISTS COMPUTE_RESOURCE_SCHEDULING ( PARALLEL_GROUP_COUNT INT, PRIMARY KEY (EXPERIMENT_ID,RESOURCE_HOST_ID,QUEUE_NAME), FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE -)ENGINE=InnoDB DEFAULT CHARSET=latin1; \ No newline at end of file +)ENGINE=InnoDB DEFAULT CHARSET=latin1; + +-- Add output storage resource ID columns +ALTER TABLE USER_CONFIGURATION_DATA ADD COLUMN IF NOT EXISTS OUTPUT_STORAGE_RESOURCE_ID VARCHAR(255) DEFAULT NULL; +ALTER TABLE PROCESS ADD COLUMN IF NOT EXISTS OUTPUT_STORAGE_RESOURCE_ID VARCHAR(255) DEFAULT NULL; \ No newline at end of file diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java index a5491f3f53..a5d148ab5b 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java @@ -739,6 +739,27 @@ public class TaskContext { return getGatewayStorageResourcePreference().getStorageResourceId(); } + public String getOutputStorageResourceId() throws Exception { + if (processModel.getOutputStorageResourceId() != null && !processModel.getOutputStorageResourceId().trim().isEmpty()) { + return processModel.getOutputStorageResourceId(); + } + return getStorageResourceId(); + } + + public StoragePreference getOutputGatewayStorageResourcePreference() throws Exception { + String outputStorageId = getOutputStorageResourceId(); + try { + return registryClient.getGatewayStoragePreference(gatewayId, outputStorageId); + } catch (TException e) { + logger.error("Failed to fetch gateway storage preference for output storage {} in gateway {}", outputStorageId, gatewayId, e); + throw e; + } + } + + public StorageResourceDescription getOutputStorageResourceDescription() throws Exception { + return registryClient.getStorageResource(getOutputStorageResourceId()); + } + private ComputationalResourceSchedulingModel getProcessCRSchedule() { if (getProcessModel() != null) { return getProcessModel().getProcessResourceSchedule(); diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java index 6295ddb7bf..3116f18fda 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java @@ -30,6 +30,7 @@ import org.apache.airavata.agents.api.AgentAdaptor; import org.apache.airavata.agents.api.AgentException; import org.apache.airavata.agents.api.FileMetadata; import org.apache.airavata.agents.api.StorageResourceAdaptor; +import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference; import org.apache.airavata.agents.streaming.TransferResult; import org.apache.airavata.agents.streaming.VirtualStreamProducer; import org.apache.airavata.common.utils.ServerSettings; @@ -102,6 +103,42 @@ public abstract class DataStagingTask extends AiravataTask { } } + @SuppressWarnings("WeakerAccess") + protected StorageResourceAdaptor getOutputStorageAdaptor(AdaptorSupport adaptorSupport) throws TaskOnFailException { + String storageId = null; + try { + storageId = getTaskContext().getOutputStorageResourceId(); + + if (getTaskContext().getProcessModel().getOutputStorageResourceId() != null + && !getTaskContext().getProcessModel().getOutputStorageResourceId().trim().isEmpty()) { + + StoragePreference outputStoragePref = getTaskContext().getOutputGatewayStorageResourcePreference(); + + StorageResourceAdaptor storageResourceAdaptor = adaptorSupport.fetchStorageAdaptor( + getGatewayId(), + storageId, + getTaskContext().getDataMovementProtocol(), + outputStoragePref.getResourceSpecificCredentialStoreToken() != null + ? outputStoragePref.getResourceSpecificCredentialStoreToken() + : getTaskContext().getGatewayResourceProfile().getCredentialStoreToken(), + outputStoragePref.getLoginUserName()); + + if (storageResourceAdaptor == null) { + throw new TaskOnFailException( + "Output storage resource adaptor for " + storageId + " can not be null", + true, null); + } + return storageResourceAdaptor; + } else { + // Fall back to default storage resource configured + return getStorageAdaptor(adaptorSupport); + } + } catch (Exception e) { + logger.error("Failed to obtain adaptor for output storage resource " + storageId + " in task " + getTaskId(), e); + throw new TaskOnFailException( "Failed to obtain adaptor for output storage resource " + storageId + " in task " + getTaskId(), false, e); + } + } + @SuppressWarnings("WeakerAccess") protected AgentAdaptor getComputeResourceAdaptor(AdaptorSupport adaptorSupport) throws TaskOnFailException { String computeId = null; diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java index 19bb9f45e7..ba1e69e0e0 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java @@ -32,6 +32,7 @@ import org.apache.airavata.helix.impl.task.TaskContext; import org.apache.airavata.helix.impl.task.TaskOnFailException; import org.apache.airavata.helix.task.api.TaskHelper; import org.apache.airavata.helix.task.api.annotation.TaskDef; +import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference; import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription; import org.apache.airavata.model.application.io.DataType; import org.apache.airavata.model.application.io.OutputDataObjectType; @@ -74,8 +75,8 @@ public class OutputDataStagingTask extends DataStagingTask { throw new TaskOnFailException(message, true, null); } - // Fetch and validate storage resource - StorageResourceDescription storageResource = getStorageResource(); + // Use output storage resource if specified, otherwise fall back to default + StorageResourceDescription storageResource = getTaskContext().getOutputStorageResourceDescription(); // Fetch and validate source and destination URLS URI sourceURI; @@ -90,13 +91,13 @@ public class OutputDataStagingTask extends DataStagingTask { sourceURI.getPath().length()); if (dataStagingTaskModel.getDestination().startsWith("dummy")) { - - String inputPath = getTaskContext().getStorageFileSystemRootLocation(); + StoragePreference outputStoragePref = getTaskContext().getOutputGatewayStorageResourcePreference(); + String inputPath = outputStoragePref.getFileSystemRootLocation(); String destFilePath = buildDestinationFilePath(inputPath, sourceFileName); destinationURI = new URI( "file", - getTaskContext().getStorageResourceLoginUserName(), + outputStoragePref.getLoginUserName(), storageResource.getHostName(), 22, destFilePath, @@ -117,7 +118,7 @@ public class OutputDataStagingTask extends DataStagingTask { } // Fetch and validate storage adaptor - StorageResourceAdaptor storageResourceAdaptor = getStorageAdaptor(taskHelper.getAdaptorSupport()); + StorageResourceAdaptor storageResourceAdaptor = getOutputStorageAdaptor(taskHelper.getAdaptorSupport()); // Fetch and validate compute resource adaptor AgentAdaptor adaptor = getComputeResourceAdaptor(taskHelper.getAdaptorSupport()); diff --git a/airavata-api/src/main/java/org/apache/airavata/model/util/ExperimentModelUtil.java b/airavata-api/src/main/java/org/apache/airavata/model/util/ExperimentModelUtil.java index 73555e6c5b..1f5ab72e81 100644 --- a/airavata-api/src/main/java/org/apache/airavata/model/util/ExperimentModelUtil.java +++ b/airavata-api/src/main/java/org/apache/airavata/model/util/ExperimentModelUtil.java @@ -93,6 +93,7 @@ public class ExperimentModelUtil { UserConfigurationDataModel configData = experiment.getUserConfigurationData(); if (configData != null) { processModel.setStorageResourceId(configData.getStorageId()); + processModel.setOutputStorageResourceId(configData.getOutputStorageResourceId()); processModel.setExperimentDataDir(configData.getExperimentDataDir()); processModel.setGenerateCert(configData.isGenerateCert()); processModel.setUserDn(configData.getUserDN()); diff --git a/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ProcessEntity.java b/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ProcessEntity.java index 4e7a458cb4..3c57967d20 100644 --- a/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ProcessEntity.java +++ b/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ProcessEntity.java @@ -94,6 +94,9 @@ public class ProcessEntity implements Serializable { @Column(name = "USE_USER_CR_PREF") private boolean useUserCRPref; + @Column(name = "OUTPUT_STORAGE_RESOURCE_ID") + private String outputStorageResourceId; + @OneToMany( targetEntity = ProcessStatusEntity.class, cascade = CascadeType.ALL, @@ -365,4 +368,12 @@ public class ProcessEntity implements Serializable { public void setExperiment(ExperimentEntity experiment) { this.experiment = experiment; } + + public String getOutputStorageResourceId() { + return outputStorageResourceId; + } + + public void setOutputStorageResourceId(String outputStorageResourceId) { + this.outputStorageResourceId = outputStorageResourceId; + } } diff --git a/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/UserConfigurationDataEntity.java b/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/UserConfigurationDataEntity.java index 1e68da78df..0d06a3100c 100644 --- a/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/UserConfigurationDataEntity.java +++ b/airavata-api/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/UserConfigurationDataEntity.java @@ -98,6 +98,9 @@ public class UserConfigurationDataEntity implements Serializable { @Column(name = "IS_USE_USER_CR_PREF") private boolean useUserCRPref; + @Column(name = "OUTPUT_STORAGE_RESOURCE_ID") + private String outputStorageResourceId; + @OneToOne(targetEntity = ExperimentEntity.class, cascade = CascadeType.ALL) @PrimaryKeyJoinColumn(name = "EXPERIMENT_ID", referencedColumnName = "EXPERIMENT_ID") private ExperimentEntity experiment; @@ -303,4 +306,12 @@ public class UserConfigurationDataEntity implements Serializable { List<ComputationalResourceSchedulingEntity> autoScheduledCompResourceSchedulingList) { this.autoScheduledCompResourceSchedulingList = autoScheduledCompResourceSchedulingList; } + + public String getOutputStorageResourceId() { + return outputStorageResourceId; + } + + public void setOutputStorageResourceId(String outputStorageResourceId) { + this.outputStorageResourceId = outputStorageResourceId; + } } diff --git a/thrift-interface-descriptions/data-models/experiment_model.thrift b/thrift-interface-descriptions/data-models/experiment_model.thrift index 0d31bb033d..5134b8cfcb 100644 --- a/thrift-interface-descriptions/data-models/experiment_model.thrift +++ b/thrift-interface-descriptions/data-models/experiment_model.thrift @@ -70,6 +70,7 @@ struct UserConfigurationDataModel { 10: optional bool useUserCRPref; 11: optional string groupResourceProfileId 12: optional list<scheduling_model.ComputationalResourceSchedulingModel> autoScheduledCompResourceSchedulingList, + 13: optional string outputStorageResourceId; } /** diff --git a/thrift-interface-descriptions/data-models/process_model.thrift b/thrift-interface-descriptions/data-models/process_model.thrift index a47e07aae8..6006f85965 100644 --- a/thrift-interface-descriptions/data-models/process_model.thrift +++ b/thrift-interface-descriptions/data-models/process_model.thrift @@ -73,4 +73,5 @@ struct ProcessModel { 24: optional bool useUserCRPref, 25: optional string groupResourceProfileId; 26: optional list<ProcessWorkflow> processWorkflows; + 27: optional string outputStorageResourceId; }
