This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch expe-data-path-fix in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 4d1e0713141b27c0efbedd642feadd26a9c631cc Author: DImuthuUpe <[email protected]> AuthorDate: Tue Nov 11 04:58:32 2025 -0500 Making experiment data path relative when the destination and source storage are different. --- .../helix/impl/task/staging/DataStagingTask.java | 1 + .../impl/task/staging/OutputDataStagingTask.java | 4 +- .../airavata_experiments/airavata.py | 61 ++++++++++++---------- 3 files changed, 36 insertions(+), 30 deletions(-) diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java index 568dc82301..6d79ee039d 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java @@ -202,6 +202,7 @@ public abstract class DataStagingTask extends AiravataTask { inputPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator); String experimentDataDir = getProcessModel().getExperimentDataDir(); String filePath; + // TODO : This logic is extremely dangerous. This was implemented expecting the input and output storage are same. if (experimentDataDir != null && !experimentDataDir.isEmpty()) { if (!experimentDataDir.endsWith(File.separator)) { experimentDataDir += File.separator; diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java index ba1e69e0e0..e5b2b12d82 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java @@ -94,7 +94,7 @@ public class OutputDataStagingTask extends DataStagingTask { StoragePreference outputStoragePref = getTaskContext().getOutputGatewayStorageResourcePreference(); String inputPath = outputStoragePref.getFileSystemRootLocation(); String destFilePath = buildDestinationFilePath(inputPath, sourceFileName); - + logger.info("Output storage path for task id " + getTaskId() + " is " + destFilePath); destinationURI = new URI( "file", outputStoragePref.getLoginUserName(), @@ -106,6 +106,8 @@ public class OutputDataStagingTask extends DataStagingTask { } else { destinationURI = new URI(dataStagingTaskModel.getDestination()); + logger.info("Output data staging destination for task id {} is {}", getTaskId(), destinationURI.getPath()); + } if (logger.isDebugEnabled()) { diff --git a/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py b/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py index 24961dcfc1..fa596feeaa 100644 --- a/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py +++ b/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py @@ -63,7 +63,7 @@ class AiravataOperator: input_file_name: str, uploaded_storage_path: str, ) -> str: - + dataProductModel = DataProductModel( gatewayId=gateway_id, ownerName=self.user_id, @@ -88,7 +88,7 @@ class AiravataOperator: description: str, gateway_id: str, ) -> ExperimentModel: - + execution_id = self.get_app_interface_id(application_name) project_id = self.get_project_id(project_name) return ExperimentModel( @@ -100,13 +100,13 @@ class AiravataOperator: experimentType=ExperimentType.SINGLE_APPLICATION, executionId=execution_id ) - + def get_resource_host_id(self, resource_name): resources = self.api_server_client.get_all_compute_resource_names(self.airavata_token) resource_id = next((k for k in resources if k.startswith(resource_name)), None) assert resource_id is not None, f"Compute resource {resource_name} not found" return resource_id - + def configure_computation_resource_scheduling( self, experiment_model: ExperimentModel, @@ -154,19 +154,19 @@ class AiravataOperator: def default_gateway_id(self): return self.settings.GATEWAY_ID - + def default_gateway_data_store_dir(self): return self.settings.GATEWAY_DATA_STORE_DIR - + def default_sftp_port(self): return self.settings.SFTP_PORT - + def default_sr_hostname(self): return self.settings.STORAGE_RESOURCE_HOST - + def connection_svc_url(self): return f"{self.settings.API_SERVER_URL}/api/v1" - + def filemgr_svc_url(self): return self.settings.FILE_SVC_URL @@ -186,7 +186,7 @@ class AiravataOperator: """ return self.api_server_client.get_experiment(self.airavata_token, experiment_id) - + def get_process_id(self, experiment_id: str) -> str: """ Get process id by experiment id @@ -247,7 +247,7 @@ class AiravataOperator: grp_id = next((grp.groupResourceProfileId for grp in grps if grp.groupResourceProfileName == group), None) assert grp_id is not None, f"Group resource profile {group} not found" return str(grp_id) - + def get_group_resource_profile(self, group_id: str): grp = self.api_server_client.get_group_resource_profile(self.airavata_token, group_id) # type: ignore return grp @@ -327,7 +327,7 @@ class AiravataOperator: paths = sftp_connector.put(local_files, remote_dir) logger.info(f"{len(paths)} Local files uploaded to remote dir: %s", remote_dir) return paths - + # step = post-staging file upload elif process_id is not None and agent_ref is not None: assert len(local_files) == 1, f"Expected 1 file, got {len(local_files)}" @@ -362,7 +362,7 @@ class AiravataOperator: # step = unknown else: raise ValueError("Invalid arguments for upload_files") - + # file manager service fallback assert process_id is not None, f"Expected process_id, got {process_id}" file = local_files[0] @@ -445,7 +445,7 @@ class AiravataOperator: f.write(content) return path.as_posix() time.sleep(1) - + # file manager service fallback assert process_id is not None, f"Expected process_id, got {process_id}" url_path = os.path.join(process_id, remote_file) @@ -621,6 +621,9 @@ class AiravataOperator: experiment_name=experiment_name, ) abs_path = (mount_point / exp_dir.lstrip("/")).as_posix().rstrip("/") + "/" + if input_sr_id != output_sr_id: + abs_path = self.user_id + "/" + project + "/" + experiment_name + print("[AV] exp_dir:", exp_dir) print("[AV] abs_path:", abs_path) @@ -641,7 +644,7 @@ class AiravataOperator: def register_input_file(file: Path) -> str: return str(self.register_input_file(file.name, input_sr_host, input_sr_id, gateway_id, file.name, abs_path)) - + # set up experiment inputs print("[AV] Setting up experiment inputs...") files_to_upload = list[Path]() @@ -716,16 +719,16 @@ class AiravataOperator: except Exception as status_err: if "FAILED" in str(status_err) or "terminal state" in str(status_err): raise status_err - + try: process_id = self.get_process_id(ex_id) except: pass - + if process_id is None: time.sleep(2) wait_count_process += 1 - + if process_id is None: raise Exception(f"[AV] Experiment {experiment_name} timeout waiting for process to begin") print(f"[AV] Experiment {experiment_name} EXECUTING with pid: {process_id}") @@ -746,16 +749,16 @@ class AiravataOperator: except Exception as status_err: if "FAILED" in str(status_err) or "terminal state" in str(status_err): raise status_err - + try: job_id, job_state = self.get_task_status(ex_id) except: pass - + if job_id in [None, "N/A"]: time.sleep(2) wait_count_task += 1 - + if job_id in [None, "N/A"]: raise Exception(f"[AV] Experiment {experiment_name} timeout waiting for task to begin") assert job_state is not None, f"Job state is None for job id: {job_id}" @@ -781,7 +784,7 @@ class AiravataOperator: status = self.api_server_client.terminate_experiment( self.airavata_token, experiment_id, self.default_gateway_id()) return status - + def execute_py(self, project: str, libraries: list[str], code: str, agent_id: str, pid: str, runtime_args: dict, cold_start: bool = True) -> str | None: # lambda to send request try: @@ -817,7 +820,7 @@ class AiravataOperator: if data["agentUp"]: break time.sleep(1) - + print(f"[av] Agent {agent_id} found! creating environment...") res = requests.post(f"{self.connection_svc_url()}/setup/env", json={ "agentId": agent_id, @@ -853,18 +856,18 @@ class AiravataOperator: response = str(data["responseString"]) break time.sleep(1) - + print(f"[av] Agent {agent_id} code executed! response: {response}") return response - + except Exception as e: print(f"[av] Remote execution failed! {e}") return None - + def get_available_groups(self, gateway_id: str = "default"): grps: list[GroupResourceProfile] = self.api_server_client.get_group_resource_list(self.airavata_token, gatewayId=gateway_id) return grps - + def get_available_runtimes(self, group: str, gateway_id: str = "default"): grps = self.get_available_groups(gateway_id) grp_id, gcr_prefs, gcr_policies = next(((x.groupResourceProfileId, x.computePreferences, x.computeResourcePolicies) for x in grps if str(x.groupResourceProfileName).strip() == group.strip()), (None, None, None)) @@ -896,13 +899,13 @@ class AiravataOperator: ) runtimes.append(runtime) return runtimes - + def get_task_status(self, experiment_id: str) -> tuple[str, JobState]: job_details: dict[str, JobStatus] = self.api_server_client.get_job_statuses(self.airavata_token, experiment_id) # type: ignore job_id = job_state = None for job_id, v in job_details.items(): job_state = v.jobState return job_id or "N/A", job_state or JobState.UNKNOWN - + JobState = JobState
