This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch expe-data-path-fix
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 4d1e0713141b27c0efbedd642feadd26a9c631cc
Author: DImuthuUpe <[email protected]>
AuthorDate: Tue Nov 11 04:58:32 2025 -0500

    Making experiment data path relative when the destination and source 
storage are different.
---
 .../helix/impl/task/staging/DataStagingTask.java   |  1 +
 .../impl/task/staging/OutputDataStagingTask.java   |  4 +-
 .../airavata_experiments/airavata.py               | 61 ++++++++++++----------
 3 files changed, 36 insertions(+), 30 deletions(-)

diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
index 568dc82301..6d79ee039d 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
@@ -202,6 +202,7 @@ public abstract class DataStagingTask extends AiravataTask {
         inputPath = (inputPath.endsWith(File.separator) ? inputPath : 
inputPath + File.separator);
         String experimentDataDir = getProcessModel().getExperimentDataDir();
         String filePath;
+        // TODO : This logic is extremely dangerous. This was implemented 
expecting the input and output storage are same.
         if (experimentDataDir != null && !experimentDataDir.isEmpty()) {
             if (!experimentDataDir.endsWith(File.separator)) {
                 experimentDataDir += File.separator;
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
index ba1e69e0e0..e5b2b12d82 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
@@ -94,7 +94,7 @@ public class OutputDataStagingTask extends DataStagingTask {
                     StoragePreference outputStoragePref = 
getTaskContext().getOutputGatewayStorageResourcePreference();
                     String inputPath = 
outputStoragePref.getFileSystemRootLocation();
                     String destFilePath = buildDestinationFilePath(inputPath, 
sourceFileName);
-
+                    logger.info("Output storage path for task id " + 
getTaskId() + " is " + destFilePath);
                     destinationURI = new URI(
                             "file",
                             outputStoragePref.getLoginUserName(),
@@ -106,6 +106,8 @@ public class OutputDataStagingTask extends DataStagingTask {
 
                 } else {
                     destinationURI = new 
URI(dataStagingTaskModel.getDestination());
+                    logger.info("Output data staging destination for task id 
{} is {}", getTaskId(), destinationURI.getPath());
+
                 }
 
                 if (logger.isDebugEnabled()) {
diff --git a/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py 
b/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py
index 24961dcfc1..fa596feeaa 100644
--- a/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py
+++ b/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py
@@ -63,7 +63,7 @@ class AiravataOperator:
       input_file_name: str,
       uploaded_storage_path: str,
   ) -> str:
-    
+
     dataProductModel = DataProductModel(
       gatewayId=gateway_id,
       ownerName=self.user_id,
@@ -88,7 +88,7 @@ class AiravataOperator:
       description: str,
       gateway_id: str,
   ) -> ExperimentModel:
-    
+
     execution_id = self.get_app_interface_id(application_name)
     project_id = self.get_project_id(project_name)
     return ExperimentModel(
@@ -100,13 +100,13 @@ class AiravataOperator:
       experimentType=ExperimentType.SINGLE_APPLICATION,
       executionId=execution_id
     )
-  
+
   def get_resource_host_id(self, resource_name):
     resources = 
self.api_server_client.get_all_compute_resource_names(self.airavata_token)
     resource_id = next((k for k in resources if k.startswith(resource_name)), 
None)
     assert resource_id is not None, f"Compute resource {resource_name} not 
found"
     return resource_id
-  
+
   def configure_computation_resource_scheduling(
       self,
       experiment_model: ExperimentModel,
@@ -154,19 +154,19 @@ class AiravataOperator:
 
   def default_gateway_id(self):
     return self.settings.GATEWAY_ID
-  
+
   def default_gateway_data_store_dir(self):
     return self.settings.GATEWAY_DATA_STORE_DIR
-  
+
   def default_sftp_port(self):
     return self.settings.SFTP_PORT
-  
+
   def default_sr_hostname(self):
     return self.settings.STORAGE_RESOURCE_HOST
-  
+
   def connection_svc_url(self):
     return f"{self.settings.API_SERVER_URL}/api/v1"
-  
+
   def filemgr_svc_url(self):
     return self.settings.FILE_SVC_URL
 
@@ -186,7 +186,7 @@ class AiravataOperator:
 
     """
     return self.api_server_client.get_experiment(self.airavata_token, 
experiment_id)
-  
+
   def get_process_id(self, experiment_id: str) -> str:
     """
     Get process id by experiment id
@@ -247,7 +247,7 @@ class AiravataOperator:
     grp_id = next((grp.groupResourceProfileId for grp in grps if 
grp.groupResourceProfileName == group), None)
     assert grp_id is not None, f"Group resource profile {group} not found"
     return str(grp_id)
-  
+
   def get_group_resource_profile(self, group_id: str):
     grp = 
self.api_server_client.get_group_resource_profile(self.airavata_token, 
group_id) # type: ignore
     return grp
@@ -327,7 +327,7 @@ class AiravataOperator:
       paths = sftp_connector.put(local_files, remote_dir)
       logger.info(f"{len(paths)} Local files uploaded to remote dir: %s", 
remote_dir)
       return paths
-    
+
     # step = post-staging file upload
     elif process_id is not None and agent_ref is not None:
       assert len(local_files) == 1, f"Expected 1 file, got {len(local_files)}"
@@ -362,7 +362,7 @@ class AiravataOperator:
     # step = unknown
     else:
       raise ValueError("Invalid arguments for upload_files")
-    
+
     # file manager service fallback
     assert process_id is not None, f"Expected process_id, got {process_id}"
     file = local_files[0]
@@ -445,7 +445,7 @@ class AiravataOperator:
             f.write(content)
           return path.as_posix()
         time.sleep(1)
-    
+
     # file manager service fallback
     assert process_id is not None, f"Expected process_id, got {process_id}"
     url_path = os.path.join(process_id, remote_file)
@@ -621,6 +621,9 @@ class AiravataOperator:
         experiment_name=experiment_name,
     )
     abs_path = (mount_point / exp_dir.lstrip("/")).as_posix().rstrip("/") + "/"
+    if input_sr_id != output_sr_id:
+      abs_path = self.user_id + "/" + project + "/" + experiment_name
+
     print("[AV] exp_dir:", exp_dir)
     print("[AV] abs_path:", abs_path)
 
@@ -641,7 +644,7 @@ class AiravataOperator:
 
     def register_input_file(file: Path) -> str:
       return str(self.register_input_file(file.name, input_sr_host, 
input_sr_id, gateway_id, file.name, abs_path))
-    
+
     # set up experiment inputs
     print("[AV] Setting up experiment inputs...")
     files_to_upload = list[Path]()
@@ -716,16 +719,16 @@ class AiravataOperator:
       except Exception as status_err:
         if "FAILED" in str(status_err) or "terminal state" in str(status_err):
           raise status_err
-      
+
       try:
         process_id = self.get_process_id(ex_id)
       except:
         pass
-      
+
       if process_id is None:
         time.sleep(2)
         wait_count_process += 1
-    
+
     if process_id is None:
       raise Exception(f"[AV] Experiment {experiment_name} timeout waiting for 
process to begin")
     print(f"[AV] Experiment {experiment_name} EXECUTING with pid: 
{process_id}")
@@ -746,16 +749,16 @@ class AiravataOperator:
       except Exception as status_err:
         if "FAILED" in str(status_err) or "terminal state" in str(status_err):
           raise status_err
-      
+
       try:
         job_id, job_state = self.get_task_status(ex_id)
       except:
         pass
-      
+
       if job_id in [None, "N/A"]:
         time.sleep(2)
         wait_count_task += 1
-    
+
     if job_id in [None, "N/A"]:
       raise Exception(f"[AV] Experiment {experiment_name} timeout waiting for 
task to begin")
     assert job_state is not None, f"Job state is None for job id: {job_id}"
@@ -781,7 +784,7 @@ class AiravataOperator:
     status = self.api_server_client.terminate_experiment(
         self.airavata_token, experiment_id, self.default_gateway_id())
     return status
-  
+
   def execute_py(self, project: str, libraries: list[str], code: str, 
agent_id: str, pid: str, runtime_args: dict, cold_start: bool = True) -> str | 
None:
     # lambda to send request
     try:
@@ -817,7 +820,7 @@ class AiravataOperator:
         if data["agentUp"]:
           break
         time.sleep(1)
-      
+
       print(f"[av] Agent {agent_id} found! creating environment...")
       res = requests.post(f"{self.connection_svc_url()}/setup/env", json={
         "agentId": agent_id,
@@ -853,18 +856,18 @@ class AiravataOperator:
           response = str(data["responseString"])
           break
         time.sleep(1)
-      
+
       print(f"[av] Agent {agent_id} code executed! response: {response}")
       return response
-    
+
     except Exception as e:
       print(f"[av] Remote execution failed! {e}")
       return None
-    
+
   def get_available_groups(self, gateway_id: str = "default"):
     grps: list[GroupResourceProfile] = 
self.api_server_client.get_group_resource_list(self.airavata_token, 
gatewayId=gateway_id)
     return grps
-  
+
   def get_available_runtimes(self, group: str, gateway_id: str = "default"):
     grps = self.get_available_groups(gateway_id)
     grp_id, gcr_prefs, gcr_policies = next(((x.groupResourceProfileId, 
x.computePreferences, x.computeResourcePolicies) for x in grps if 
str(x.groupResourceProfileName).strip() == group.strip()), (None, None, None))
@@ -896,13 +899,13 @@ class AiravataOperator:
         )
         runtimes.append(runtime)
     return runtimes
-  
+
   def get_task_status(self, experiment_id: str) -> tuple[str, JobState]:
     job_details: dict[str, JobStatus] = 
self.api_server_client.get_job_statuses(self.airavata_token, experiment_id) # 
type: ignore
     job_id = job_state = None
     for job_id, v in job_details.items():
       job_state = v.jobState
     return job_id or "N/A", job_state or JobState.UNKNOWN
-  
+
   JobState = JobState
 

Reply via email to