This is an automated email from the ASF dual-hosted git repository.

lahirujayathilake pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 47d882e39604600d425d1159a08d1cdd55ca7623
Author: yasith <[email protected]>
AuthorDate: Mon Dec 16 01:00:19 2024 -0600

    add pre-submission validator, update storage dir, take CONNECTION_SVC_URL 
and FILEMGR_SVC_URL from settings.ini, cleanup runtime.py, add fallback apis to 
airavata.py, rearrange files, setup file ul/dl apis, improve ux, update 
notebooks, add agent scripts, refine code, fix bugs
---
 .../airavata-python-sdk/.gitignore                 |   2 +-
 .../airavata_experiments/__init__.py               |   4 +-
 .../airavata_experiments/airavata.py               | 390 ++++++++++++++++-----
 .../airavata_experiments/base.py                   |   6 +-
 .../airavata_experiments/md/applications.py        |  26 +-
 .../airavata_experiments/plan.py                   |  35 +-
 .../airavata_experiments/runtime.py                | 199 +++--------
 .../airavata_experiments/task.py                   |  17 +
 .../airavata-python-sdk/pyproject.toml             |   4 +-
 .../airavata-python-sdk/samples/poc.ipynb          | 384 --------------------
 dev-tools/deployment/scripts/.gitkeep              |   0
 .../deployment/scripts/expanse/alphafold2-agent.sh | 133 +++++++
 dev-tools/deployment/scripts/expanse/namd-agent.sh | 232 ++++++++++++
 .../jupyterhub/data/1_experiment_sdk.ipynb         |  68 ++--
 14 files changed, 787 insertions(+), 713 deletions(-)

diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/.gitignore 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/.gitignore
index 2fb5c82ffc..794ce1ca9c 100644
--- a/airavata-api/airavata-client-sdks/airavata-python-sdk/.gitignore
+++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/.gitignore
@@ -9,7 +9,7 @@ __pycache__/
 .ipynb_checkpoints
 *.egg-info/
 data/
-results/
+results*/
 plan.json
 settings*.ini
 auth.state
\ No newline at end of file
diff --git 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/__init__.py
 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/__init__.py
index c6927de016..f45c820c4f 100644
--- 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/__init__.py
+++ 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/__init__.py
@@ -16,11 +16,11 @@
 
 from __future__ import annotations
 
-from . import base, md, plan
+from . import base, plan
 from .auth import login, logout
 from .runtime import list_runtimes, Runtime
 
-__all__ = ["login", "logout", "list_runtimes", "base", "md", "plan"]
+__all__ = ["login", "logout", "list_runtimes", "base", "plan"]
 
 def display_runtimes(runtimes: list[Runtime]):
   """
diff --git 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py
 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py
index 36c2efbf43..bf778a05df 100644
--- 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py
+++ 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py
@@ -18,20 +18,29 @@ import logging
 from pathlib import Path
 from typing import Literal, NamedTuple
 from .sftp import SFTPConnector
+import time
+import warnings
+import requests
+from urllib.parse import urlparse
+import uuid
+import os
+import base64
 
 import jwt
 from airavata.model.security.ttypes import AuthzToken
 from airavata.model.experiment.ttypes import ExperimentModel, ExperimentType, 
UserConfigurationDataModel
 from airavata.model.scheduling.ttypes import 
ComputationalResourceSchedulingModel
 from airavata.model.data.replica.ttypes import DataProductModel, 
DataProductType, DataReplicaLocationModel, ReplicaLocationCategory
-
 from airavata_sdk.clients.api_server_client import APIServerClient
 
+warnings.filterwarnings("ignore", category=DeprecationWarning)
 logger = logging.getLogger("airavata_sdk.clients")
 logger.setLevel(logging.INFO)
 
 LaunchState = NamedTuple("LaunchState", [
   ("experiment_id", str),
+  ("agent_ref", str),
+  ("process_id", str),
   ("mount_point", Path),
   ("experiment_dir", str),
   ("sr_host", str),
@@ -49,6 +58,8 @@ class Settings:
     self.API_SERVER_HOST = config.get('APIServer', 'API_HOST')
     self.API_SERVER_PORT = config.getint('APIServer', 'API_PORT')
     self.API_SERVER_SECURE = config.getboolean('APIServer', 'API_SECURE')
+    self.CONNECTION_SVC_URL = config.get('APIServer', 'CONNECTION_SVC_URL')
+    self.FILEMGR_SVC_URL = config.get('APIServer', 'FILEMGR_SVC_URL')
     
     # gateway settings
     self.GATEWAY_ID = config.get('Gateway', 'GATEWAY_ID')
@@ -176,6 +187,12 @@ class AiravataOperator:
   
   def default_project_name(self):
     return self.settings.PROJECT_NAME
+  
+  def connection_svc_url(self):
+    return self.settings.CONNECTION_SVC_URL
+  
+  def filemgr_svc_url(self):
+    return self.settings.FILEMGR_SVC_URL
 
   def __airavata_token__(self, access_token: str, gateway_id: str):
     """
@@ -187,14 +204,22 @@ class AiravataOperator:
     claimsMap = {"userName": self.user_id, "gatewayID": gateway_id}
     return AuthzToken(accessToken=self.access_token, claimsMap=claimsMap)
 
-
   def get_experiment(self, experiment_id: str):
     """
     Get experiment by id
 
     """
     return self.api_server_client.get_experiment(self.airavata_token, 
experiment_id)
+  
+  def get_process_id(self, experiment_id: str) -> str:
+    """
+    Get process id by experiment id
 
+    """
+    tree: any = 
self.api_server_client.get_detailed_experiment_tree(self.airavata_token, 
experiment_id) # type: ignore
+    processModels: list = tree.processes
+    assert len(processModels) == 1, f"Expected 1 process model, got 
{len(processModels)}"
+    return processModels[0].processId
 
   def get_accessible_apps(self, gateway_id: str | None = None):
     """
@@ -207,7 +232,6 @@ class AiravataOperator:
     app_interfaces = 
self.api_server_client.get_all_application_interfaces(self.airavata_token, 
gateway_id)
     return app_interfaces
 
-
   def get_preferred_storage(self, gateway_id: str | None = None, sr_hostname: 
str | None = None):
     """
     Get preferred storage resource
@@ -221,7 +245,6 @@ class AiravataOperator:
     sr_id = next((str(k) for k, v in sr_names.items() if v == sr_hostname))
     return 
self.api_server_client.get_gateway_storage_preference(self.airavata_token, 
gateway_id, sr_id)
 
-
   def get_storage(self, storage_name: str | None = None) -> any:  # type: 
ignore
     """
     Get storage resource by name
@@ -234,9 +257,6 @@ class AiravataOperator:
     sr_id = next((str(k) for k, v in sr_names.items() if v == storage_name))
     storage = self.api_server_client.get_storage_resource(self.airavata_token, 
sr_id)
     return storage
-  
-
-
 
   def get_group_resource_profile_id(self, grp_name: str | None = None) -> str:
     """
@@ -254,7 +274,6 @@ class AiravataOperator:
     grp: any = 
self.api_server_client.get_group_resource_profile(self.airavata_token, grp_id) 
# type: ignore
     return grp
 
-
   def get_compatible_deployments(self, app_interface_id: str, grp_name: str | 
None = None):
     """
     Get compatible deployments for an application interface and group resource 
profile
@@ -268,7 +287,6 @@ class AiravataOperator:
     deployments = 
self.api_server_client.get_application_deployments_for_app_module_and_group_resource_profile(self.airavata_token,
 app_interface_id, grp_id)
     return deployments
 
-
   def get_app_interface_id(self, app_name: str, gateway_id: str | None = None):
     """
     Get application interface id by name
@@ -278,7 +296,6 @@ class AiravataOperator:
     apps: list = 
self.api_server_client.get_all_application_interfaces(self.airavata_token, 
gateway_id) # type: ignore
     app_id = next((app.applicationInterfaceId for app in apps if 
app.applicationName == app_name))
     return str(app_id)
-  
 
   def get_project_id(self, project_name: str, gateway_id: str | None = None):
     gateway_id = str(gateway_id or self.default_gateway_id())
@@ -286,7 +303,6 @@ class AiravataOperator:
     project_id = next((p.projectID for p in projects if p.name == project_name 
and p.owner == self.user_id))
     return str(project_id)
 
-
   def get_application_inputs(self, app_interface_id: str) -> list:
     """
     Get application inputs by id
@@ -294,7 +310,6 @@ class AiravataOperator:
     """
     return 
list(self.api_server_client.get_application_inputs(self.airavata_token, 
app_interface_id))  # type: ignore
 
-
   def get_compute_resources_by_ids(self, resource_ids: list[str]):
     """
     Get compute resources by ids
@@ -302,7 +317,6 @@ class AiravataOperator:
     """
     return [self.api_server_client.get_compute_resource(self.airavata_token, 
resource_id) for resource_id in resource_ids]
 
-
   def make_experiment_dir(self, sr_host: str, project_name: str, 
experiment_name: str) -> str:
     """
     Make experiment directory on storage resource, and return the remote path
@@ -317,68 +331,191 @@ class AiravataOperator:
     logger.info("Experiment directory created at %s", remote_path)
     return remote_path
 
-
-  def upload_files(self, sr_host: str, local_files: list[Path], remote_dir: 
str) -> list[str]:
+  def upload_files(self, process_id: str | None, agent_ref: str | None, 
sr_host: str, local_files: list[Path], remote_dir: str) -> list[str]:
     """
     Upload local files to a remote directory of a storage resource
+    TODO add data_svc fallback
 
     Return Path: /{project_name}/{experiment_name}
 
     """
-    host = sr_host
-    port = self.default_sftp_port()
-    sftp_connector = SFTPConnector(host=host, port=int(port), 
username=self.user_id, password=self.access_token)
-    paths = sftp_connector.put(local_files, remote_dir)
-    logger.info(f"{len(paths)} Local files uploaded to remote dir: %s", 
remote_dir)
-    return paths
 
+    # step = experiment staging
+    if process_id is None and agent_ref is None:
+      host = sr_host
+      port = self.default_sftp_port()
+      sftp_connector = SFTPConnector(host=host, port=int(port), 
username=self.user_id, password=self.access_token)
+      paths = sftp_connector.put(local_files, remote_dir)
+      logger.info(f"{len(paths)} Local files uploaded to remote dir: %s", 
remote_dir)
+      return paths
+    
+    # step = post-staging file upload
+    elif process_id is not None and agent_ref is not None:
+      assert len(local_files) == 1, f"Expected 1 file, got {len(local_files)}"
+      file = local_files[0]
+      fp = os.path.join("/data", file.name)
+      rawdata = file.read_bytes()
+      b64data = base64.b64encode(rawdata).decode()
+      res = 
requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={
+          "agentId": agent_ref,
+          "workingDir": ".",
+          "arguments": ["sh", "-c", f"echo {b64data} | base64 -d > {fp}"]
+      })
+      data = res.json()
+      if data["error"] is not None:
+        if str(data["error"]) == "Agent not found":
+          port = self.default_sftp_port()
+          sftp_connector = SFTPConnector(host=sr_host, port=int(port), 
username=self.user_id, password=self.access_token)
+          paths = sftp_connector.put(local_files, remote_dir)
+          return paths
+        else:
+          raise Exception(data["error"])
+      else:
+        exc_id = data["executionId"]
+        while True:
+          res = 
requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}")
+          data = res.json()
+          if data["available"]:
+            return [fp]
+          time.sleep(1)
+
+    # step = unknown
+    else:
+      raise ValueError("Invalid arguments for upload_files")
+    
+    # file manager service fallback
+    assert process_id is not None, f"Expected process_id, got {process_id}"
+    file = local_files[0]
+    url_path = os.path.join(process_id, file.name)
+    filemgr_svc_upload_url = f"{self.filemgr_svc_url()}/upload/live/{url_path}"
 
-  def list_files(self, sr_host: str, remote_dir: str) -> list[str]:
+  def list_files(self, process_id: str, agent_ref: str, sr_host: str, 
remote_dir: str) -> list[str]:
     """
     List files in a remote directory of a storage resource
+    TODO add data_svc fallback
 
     Return Path: /{project_name}/{experiment_name}
 
     """
-    host = sr_host
-    port = self.default_sftp_port()
-    sftp_connector = SFTPConnector(host=host, port=int(port), 
username=self.user_id, password=self.access_token)
-    return sftp_connector.ls(remote_dir)
-
-
-  def download_file(self, sr_host: str, remote_file: str, local_dir: str) -> 
str:
+    res = 
requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={
+        "agentId": agent_ref,
+        "workingDir": ".",
+        "arguments": ["sh", "-c", "cd /data && find . -type f -printf '%P\n'"]
+    })
+    data = res.json()
+    if data["error"] is not None:
+      if str(data["error"]) == "Agent not found":
+        port = self.default_sftp_port()
+        sftp_connector = SFTPConnector(host=sr_host, port=int(port), 
username=self.user_id, password=self.access_token)
+        return sftp_connector.ls(remote_dir)
+      else:
+        raise Exception(data["error"])
+    else:
+      exc_id = data["executionId"]
+      while True:
+        res = 
requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}")
+        data = res.json()
+        if data["available"]:
+          files = data["responseString"].split("\n")
+          return files
+        time.sleep(1)
+
+    # file manager service fallback
+    assert process_id is not None, f"Expected process_id, got {process_id}"
+    filemgr_svc_ls_url = f"{self.filemgr_svc_url()}/list/live/{process_id}"
+
+  def download_file(self, process_id: str, agent_ref: str, sr_host: str, 
remote_file: str, remote_dir: str, local_dir: str) -> str:
     """
     Download files from a remote directory of a storage resource to a local 
directory
+    TODO add data_svc fallback
 
     Return Path: /{project_name}/{experiment_name}
 
     """
-    host = sr_host
-    port = self.default_sftp_port()
-    sftp_connector = SFTPConnector(host=host, port=int(port), 
username=self.user_id, password=self.access_token)
-    path = sftp_connector.get(remote_file, local_dir)
-    logger.info("Remote files downlaoded to local dir: %s", local_dir)
-    return path
+    import os
+    fp = os.path.join("/data", remote_file)
+    res = 
requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={
+        "agentId": agent_ref,
+        "workingDir": ".",
+        "arguments": ["sh", "-c", f"cat {fp} | base64 -w0"]
+    })
+    data = res.json()
+    if data["error"] is not None:
+      if str(data["error"]) == "Agent not found":
+        port = self.default_sftp_port()
+        fp = os.path.join(remote_dir, remote_file)
+        sftp_connector = SFTPConnector(host=sr_host, port=int(port), 
username=self.user_id, password=self.access_token)
+        path = sftp_connector.get(fp, local_dir)
+        return path
+      else:
+        raise Exception(data["error"])
+    else:
+      exc_id = data["executionId"]
+      while True:
+        res = 
requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}")
+        data = res.json()
+        if data["available"]:
+          content = data["responseString"]
+          import base64
+          content = base64.b64decode(content)
+          path =  Path(local_dir) / remote_file
+          with open(path, "wb") as f:
+            f.write(content)
+          return path.as_posix()
+        time.sleep(1)
+    
+    # file manager service fallback
+    assert process_id is not None, f"Expected process_id, got {process_id}"
+    url_path = os.path.join(process_id, remote_file)
+    filemgr_svc_download_url = 
f"{self.filemgr_svc_url()}/download/live/{url_path}"
   
-  def cat_file(self, sr_host: str, remote_file: str) -> bytes:
+  def cat_file(self, process_id: str, agent_ref: str, sr_host: str, 
remote_file: str, remote_dir: str) -> bytes:
     """
     Download files from a remote directory of a storage resource to a local 
directory
+    TODO add data_svc fallback
 
     Return Path: /{project_name}/{experiment_name}
 
     """
-    host = sr_host
-    port = self.default_sftp_port()
-    sftp_connector = SFTPConnector(host=host, port=int(port), 
username=self.user_id, password=self.access_token)
-    data = sftp_connector.cat(remote_file)
-    logger.info("Remote files downlaoded to local dir: %s bytes", len(data))
-    return data
+    import os
+    fp = os.path.join("/data", remote_file)
+    res = 
requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={
+        "agentId": agent_ref,
+        "workingDir": ".",
+        "arguments": ["sh", "-c", f"cat {fp} | base64 -w0"]
+    })
+    data = res.json()
+    if data["error"] is not None:
+      if str(data["error"]) == "Agent not found":
+        port = self.default_sftp_port()
+        fp = os.path.join(remote_dir, remote_file)
+        sftp_connector = SFTPConnector(host=sr_host, port=int(port), 
username=self.user_id, password=self.access_token)
+        data = sftp_connector.cat(fp)
+        return data
+      else:
+        raise Exception(data["error"])
+    else:
+      exc_id = data["executionId"]
+      while True:
+        res = 
requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}")
+        data = res.json()
+        if data["available"]:
+          content = data["responseString"]
+          import base64
+          content = base64.b64decode(content)
+          return content
+        time.sleep(1)
+
+    # file manager service fallback
+    assert process_id is not None, f"Expected process_id, got {process_id}"
+    url_path = os.path.join(process_id, remote_file)
+    filemgr_svc_download_url = 
f"{self.filemgr_svc_url()}/download/live/{url_path}"
 
   def launch_experiment(
       self,
       experiment_name: str,
       app_name: str,
-      inputs: dict[str, str | int | float | list[str]],
+      inputs: dict[str, dict[str, str | int | float | list[str]]],
       computation_resource_name: str,
       queue_name: str,
       node_count: int,
@@ -402,24 +539,43 @@ class AiravataOperator:
     sr_host = str(sr_host or self.default_sr_hostname())
     mount_point = Path(self.default_gateway_data_store_dir()) / self.user_id
     project_name = str(project_name or self.default_project_name())
+    agent_ref = str(uuid.uuid4())
+    server_url = urlparse(self.connection_svc_url()).netloc
 
     # validate args (str)
     print("[AV] Validating args...")
-    assert len(experiment_name) > 0
-    assert len(app_name) > 0
-    assert len(computation_resource_name) > 0
-    assert len(inputs) > 0
-    assert len(gateway_id) > 0
-    assert len(queue_name) > 0
-    assert len(grp_name) > 0
-    assert len(sr_host) > 0
-    assert len(project_name) > 0
-    assert len(mount_point.as_posix()) > 0
+    assert len(experiment_name) > 0, f"Invalid experiment_name: 
{experiment_name}"
+    assert len(app_name) > 0, f"Invalid app_name: {app_name}"
+    assert len(computation_resource_name) > 0, f"Invalid 
computation_resource_name: {computation_resource_name}"
+    assert len(inputs) > 0, f"Invalid inputs: {inputs}"
+    assert len(gateway_id) > 0, f"Invalid gateway_id: {gateway_id}"
+    assert len(queue_name) > 0, f"Invalid queue_name: {queue_name}"
+    assert len(grp_name) > 0, f"Invalid grp_name: {grp_name}"
+    assert len(sr_host) > 0, f"Invalid sr_host: {sr_host}"
+    assert len(project_name) > 0, f"Invalid project_name: {project_name}"
+    assert len(mount_point.as_posix()) > 0, f"Invalid mount_point: 
{mount_point}"
 
     # validate args (int)
-    assert node_count > 0
-    assert cpu_count > 0
-    assert walltime > 0
+    assert node_count > 0, f"Invalid node_count: {node_count}"
+    assert cpu_count > 0, f"Invalid cpu_count: {cpu_count}"
+    assert walltime > 0, f"Invalid walltime: {walltime}"
+
+    # parse and validate inputs
+    file_inputs = dict[str, Path | list[Path]]()
+    data_inputs = dict[str, str | int | float]()
+    for input_name, input_spec in inputs.items():
+      input_type = input_spec["type"]
+      input_value = input_spec["value"]
+      if input_type == "uri":
+        assert isinstance(input_value, str) and 
os.path.isfile(str(input_value)), f"Invalid {input_name}: {input_value}"
+        file_inputs[input_name] = Path(input_value)
+      elif input_type == "uri[]":
+        assert isinstance(input_value, list) and all([os.path.isfile(str(v)) 
for v in input_value]), f"Invalid {input_name}: {input_value}"
+        file_inputs[input_name] = [Path(v) for v in input_value]
+      else:
+        assert isinstance(input_value, (int, float, str)), f"Invalid 
{input_name}: {input_value}"
+        data_inputs[input_name] = input_value
+    data_inputs.update({"agent_id": agent_ref, "server_url": server_url})
 
     # setup runtime params
     print("[AV] Setting up runtime params...")
@@ -429,7 +585,7 @@ class AiravataOperator:
     # setup application interface
     print("[AV] Setting up application interface...")
     app_interface_id = self.get_app_interface_id(app_name)
-    assert app_interface_id is not None
+    assert app_interface_id is not None, f"Invalid app_interface_id: 
{app_interface_id}"
 
     # setup experiment
     print("[AV] Setting up experiment...")
@@ -464,45 +620,37 @@ class AiravataOperator:
         auto_schedule=auto_schedule,
     )
 
-    # set up file inputs
-    print("[AV] Setting up file inputs...")
-
     def register_input_file(file: Path) -> str:
       return str(self.register_input_file(file.name, sr_host, sr_id, 
gateway_id, file.name, abs_path))
-
-    # setup experiment inputs
+    
+    # set up file inputs
+    print("[AV] Setting up file inputs...")
     files_to_upload = list[Path]()
-    file_inputs = dict[str, str | list[str]]()
-    data_inputs = dict[str, str | list[str] | int | float]()
-    for key, value in inputs.items():
-
-      if isinstance(value, str) and Path(value).is_file():
-        file = Path(value)
-        files_to_upload.append(file)
-        file_inputs[key] = register_input_file(file)
-
-      elif isinstance(value, list) and all([isinstance(v, str) and 
Path(v).is_file() for v in value]):
-        files = [*map(Path, value)]
-        files_to_upload.extend(files)
-        file_inputs[key] = [*map(register_input_file, files)]
-
+    file_refs = dict[str, str | list[str]]()
+    for key, value in file_inputs.items():
+      if isinstance(value, Path):
+        files_to_upload.append(value)
+        file_refs[key] = register_input_file(value)
+      elif isinstance(value, list):
+        assert all([isinstance(v, Path) for v in value]), f"Invalid file input 
value: {value}"
+        files_to_upload.extend(value)
+        file_refs[key] = [*map(register_input_file, value)]
       else:
-        data_inputs[key] = value
-
-    # configure file inputs for experiment
-    print(f"[AV] Uploading {len(files_to_upload)} file inputs for 
experiment...")
-    self.upload_files(storage.hostName, files_to_upload, exp_dir)
+        raise ValueError("Invalid file input type")
 
     # configure experiment inputs
     experiment_inputs = []
     for exp_input in 
self.api_server_client.get_application_inputs(self.airavata_token, 
app_interface_id):  # type: ignore
       if exp_input.type < 3 and exp_input.name in data_inputs:
         value = data_inputs[exp_input.name]
-        exp_input.value = repr(value)
-      elif exp_input.type == 3 and exp_input.name in file_inputs:
-        exp_input.value = file_inputs[exp_input.name]
-      elif exp_input.type == 4 and exp_input.name in file_inputs:
-        exp_input.value = ','.join(file_inputs[exp_input.name])
+        if exp_input.type == 0:
+          exp_input.value = str(value)
+        else:
+          exp_input.value = repr(value)
+      elif exp_input.type == 3 and exp_input.name in file_refs:
+        exp_input.value = file_refs[exp_input.name]
+      elif exp_input.type == 4 and exp_input.name in file_refs:
+        exp_input.value = ','.join(file_refs[exp_input.name])
       experiment_inputs.append(exp_input)
     experiment.experimentInputs = experiment_inputs
 
@@ -510,31 +658,81 @@ class AiravataOperator:
     outputs = 
self.api_server_client.get_application_outputs(self.airavata_token, 
app_interface_id)
     experiment.experimentOutputs = outputs
 
+    # upload file inputs for experiment
+    print(f"[AV] Uploading {len(files_to_upload)} file inputs for 
experiment...")
+    self.upload_files(None, None, storage.hostName, files_to_upload, exp_dir)
+
     # create experiment
     ex_id = self.api_server_client.create_experiment(self.airavata_token, 
gateway_id, experiment)
-
-    # TODO agent_id generate and send as input parameter
-    # connect to connection service after this point, and route all 
file-related requests through it
-    # later build a ssh adapter for ls type tasks
+    ex_id = str(ex_id)
+    print(f"[AV] Experiment {experiment_name} CREATED with id: {ex_id}")
 
     # launch experiment
     self.api_server_client.launch_experiment(self.airavata_token, ex_id, 
gateway_id)
+    print(f"[AV] Experiment {experiment_name} STARTED with id: {ex_id}")
+
+    # get process id
+    print(f"[AV] Experiment {experiment_name} WAITING until experiment 
begins...")
+    process_id = None
+    while process_id is None:
+      try:
+        process_id = self.get_process_id(ex_id)
+      except:
+        time.sleep(2)
+      else:
+        time.sleep(2)
+    print(f"[AV] Experiment {experiment_name} EXECUTING with pid: 
{process_id}")
 
     return LaunchState(
-      experiment_id=str(ex_id),
+      experiment_id=ex_id,
+      agent_ref=agent_ref,
+      process_id=process_id,
       mount_point=mount_point,
       experiment_dir=exp_dir,
-      sr_host=str(storage.hostName),
+      sr_host=storage.hostName,
     )
 
-
-  def get_experiment_status(self, experiment_id) -> Literal["CREATED", 
"VALIDATED", "SCHEDULED", "LAUNCHED", "EXECUTING", "CANCELING", "CANCELED", 
"COMPLETED", "FAILED"]:
+  def get_experiment_status(self, experiment_id: str) -> Literal["CREATED", 
"VALIDATED", "SCHEDULED", "LAUNCHED", "EXECUTING", "CANCELING", "CANCELED", 
"COMPLETED", "FAILED"]:
     states = ["CREATED", "VALIDATED", "SCHEDULED", "LAUNCHED", "EXECUTING", 
"CANCELING", "CANCELED", "COMPLETED", "FAILED"]
     status: any = 
self.api_server_client.get_experiment_status(self.airavata_token, 
experiment_id) # type: ignore
     return states[status.state]
-  
 
-  def stop_experiment(self, experiment_id):
+  def stop_experiment(self, experiment_id: str):
     status = self.api_server_client.terminate_experiment(
         self.airavata_token, experiment_id, self.default_gateway_id())
     return status
+  
+  def execute_py(self, libraries: list[str], code: str, agent_ref: str) -> str 
| None:
+    print(f"[av] Executing Python Code...")
+    try:
+      res = 
requests.post(f"{self.connection_svc_url()}/agent/executepythonrequest", json={
+          "libraries": libraries,
+          "code": code,
+          "pythonVersion": "3.10", # TODO verify
+          "keepAlive": False, # TODO verify
+          "parentExperimentId": "/data", # the working directory
+          "agentId": agent_ref,
+      })
+      data = res.json()
+      if data["error"] is not None:
+        raise Exception(data["error"])
+      else:
+        exc_id = data["executionId"]
+        while True:
+          res = 
requests.get(f"{self.connection_svc_url()}/agent/executepythonresponse/{exc_id}")
+          data = res.json()
+          if data["available"]:
+            response = str(data["responseString"])
+            return response
+          time.sleep(1)
+    except Exception as e:
+      print("[av] Remote execution failed! {e}")
+      return None
+    
+  def get_available_runtimes(self):
+    from .runtime import Remote
+    return [
+      Remote(cluster="login.expanse.sdsc.edu", category="gpu", 
queue_name="gpu-shared", node_count=1, cpu_count=10, walltime=30),
+      Remote(cluster="login.expanse.sdsc.edu", category="cpu", 
queue_name="shared", node_count=1, cpu_count=10, walltime=30),
+      Remote(cluster="anvil.rcac.purdue.edu", category="cpu", 
queue_name="shared", node_count=1, cpu_count=24, walltime=30),
+    ]
diff --git 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/base.py
 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/base.py
index 8fe1865d1b..1967cf6f73 100644
--- 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/base.py
+++ 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/base.py
@@ -65,7 +65,7 @@ class Experiment(Generic[T], abc.ABC):
   name: str
   application: T
   inputs: dict[str, Any]
-  input_mapping: dict[str, str]
+  input_mapping: dict[str, tuple[Any, str]]
   resource: Runtime = Runtime.default()
   tasks: list[Task] = []
 
@@ -115,7 +115,7 @@ class Experiment(Generic[T], abc.ABC):
 
       task_specific_params = dict(zip(space.keys(), values))
       agg_inputs = {**self.inputs, **task_specific_params}
-      task_inputs = {k: agg_inputs[v] for k, v in self.input_mapping.items()}
+      task_inputs = {k: {"value": agg_inputs[v[0]], "type": v[1]} for k, v in 
self.input_mapping.items()}
 
       self.tasks.append(Task(
           name=f"{self.name}_{uuid_str}",
@@ -130,6 +130,6 @@ class Experiment(Generic[T], abc.ABC):
     tasks = []
     for t in self.tasks:
       agg_inputs = {**self.inputs, **t.inputs}
-      task_inputs = {k: agg_inputs[v] for k, v in self.input_mapping.items()}
+      task_inputs = {k: {"value": agg_inputs[v[0]], "type": v[1]} for k, v in 
self.input_mapping.items()}
       tasks.append(Task(name=t.name, app_id=self.application.app_id, 
inputs=task_inputs, runtime=t.runtime))
     return Plan(tasks=tasks)
diff --git 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/md/applications.py
 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/md/applications.py
index e5e30c92b2..b68b1f7acb 100644
--- 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/md/applications.py
+++ 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/md/applications.py
@@ -58,19 +58,19 @@ class NAMD(ExperimentApp):
         num_replicas=num_replicas,
     )
     obj.input_mapping = {
-        "MD-Instructions-Input": "config_file",  # uri? [REQUIRED]
-        "Coordinates-PDB-File": "pdb_file",  # uri? [OPTIONAL]
-        "Protein-Structure-File_PSF": "psf_file",  # uri? [REQUIRED]
-        "FF-Parameter-Files": "ffp_files",  # uri[]? [REQUIRED]
-        "Execution_Type": "parallelism",  # "CPU" | "GPU" [REQUIRED]
-        "Optional_Inputs": "other_files",  # uri[]? [OPTIONAL]
-        "Number of Replicas": "num_replicas",  # integer [REQUIRED]
-        # "Constraints-PDB": "pdb_file",  # uri? [OPTIONAL]
-        # "Replicate": None,  # "yes"? [OPTIONAL]
-        # "Continue_from_Previous_Run?": None,  # "yes"? [OPTIONAL]
-        # "Previous_JobID": None,  # string? [OPTIONAL] [show if 
"Continue_from_Previous_Run?" == "yes"]
-        # "GPU Resource Warning": None,  # string? [OPTIONAL] [show if 
"Continue_from_Previous_Run?" == "yes"]
-        # "Restart_Replicas_List": None,  # string [OPTIONAL] [show if 
"Continue_from_Previous_Run?" == "yes"]
+        "MD-Instructions-Input": ("config_file", "uri"),  # uri? [REQUIRED]
+        "Coordinates-PDB-File": ("pdb_file", "uri"),  # uri? [OPTIONAL]
+        "Protein-Structure-File_PSF": ("psf_file", "uri"),  # uri? [REQUIRED]
+        "FF-Parameter-Files": ("ffp_files", "uri[]"),  # uri[]? [REQUIRED]
+        "Execution_Type": ("parallelism", "str"),  # "CPU" | "GPU" [REQUIRED]
+        "Optional_Inputs": ("other_files", "uri[]"),  # uri[]? [OPTIONAL]
+        "Number of Replicas": ("num_replicas", "str"),  # integer [REQUIRED]
+        # "Constraints-PDB": ("pdb_file", "uri"),  # uri? [OPTIONAL]
+        # "Replicate": (None, "str"),  # "yes"? [OPTIONAL]
+        # "Continue_from_Previous_Run?": (None, "str"),  # "yes"? [OPTIONAL]
+        # "Previous_JobID": (None, "str"),  # string? [OPTIONAL] [show if 
"Continue_from_Previous_Run?" == "yes"]
+        # "GPU Resource Warning": (None, "str"),  # string? [OPTIONAL] [show 
if "Continue_from_Previous_Run?" == "yes"]
+        # "Restart_Replicas_List": (None, "str[]"),  # string [OPTIONAL] [show 
if "Continue_from_Previous_Run?" == "yes"]
     }
     obj.tasks = []
     return obj
diff --git 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/plan.py
 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/plan.py
index 78991de4e8..b6cdaa497c 100644
--- 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/plan.py
+++ 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/plan.py
@@ -76,16 +76,7 @@ class Plan(pydantic.BaseModel):
     print("Fetching results...")
     fps = list[list[str]]()
     for task in self.tasks:
-      runtime = task.runtime
-      ref = task.ref
-      task_dir = os.path.join(local_dir, task.name)
-      os.makedirs(task_dir, exist_ok=True)
-      fps_task = list[str]()
-      assert ref is not None
-      for remote_fp in task.ls():
-        fp = runtime.download(remote_fp, task_dir, task)
-        fps_task.append(fp)
-      fps.append(fps_task)
+      fps.append(task.download_all(local_dir))
     print("Results fetched.")
     self.save_json(os.path.join(local_dir, "plan.json"))
     return fps
@@ -101,26 +92,26 @@ class Plan(pydantic.BaseModel):
 
   def status(self) -> None:
     statuses = self.__stage_status__()
+    print(f"Plan {self.id} ({len(self.tasks)} tasks):")
     for task, status in zip(self.tasks, statuses):
-      print(f"{task.name}: {status}")
+      print(f"* {task.name}: {status}")
 
-  def join(self, check_every_n_mins: float = 0.1) -> None:
+  def wait_for_completion(self, check_every_n_mins: float = 0.1) -> None:
     n = len(self.tasks)
     try:
       with Progress() as progress:
-        pbars = [progress.add_task(f"{task.name} ({i+1}/{n})", total=None) for 
i, task in enumerate(self.tasks)]
-        completed = [False] * n
-        while not all(completed):
+        pbars = [progress.add_task(f"{task.name} ({i+1}/{n}): CHECKING", 
total=None) for i, task in enumerate(self.tasks)]
+        while True:
+          completed = [False] * n
           statuses = self.__stage_status__()
-          for i, (task, status) in enumerate(zip(self.tasks, statuses)):
-            pbar = pbars[i]
-            progress.update(pbar, description=f"{task.name} ({i+1}/{n}): 
{status}")
-            if is_terminal_state(status):
-              completed[i] = True
-              progress.update(pbar, completed=True)
+          for i, (task, status, pbar) in enumerate(zip(self.tasks, statuses, 
pbars)):
+            completed[i] = is_terminal_state(status)
+            progress.update(pbar, description=f"{task.name} ({i+1}/{n}): 
{status}", completed=completed[i], refresh=True)
+          if all(completed):
+            break
           sleep_time = check_every_n_mins * 60
           time.sleep(sleep_time)
-        print("Task(s) complete.")
+        print("All tasks completed.")
     except KeyboardInterrupt:
       print("Interrupted by user.")
 
diff --git 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/runtime.py
 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/runtime.py
index 16f5c41dc1..0633e4d76b 100644
--- 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/runtime.py
+++ 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/runtime.py
@@ -20,19 +20,10 @@ from typing import Any
 from pathlib import Path
 
 import pydantic
-import requests
-import uuid
-import time
 
 # from .task import Task
 Task = Any
 
-def is_terminal_state(x): return x in ["CANCELED", "COMPLETED", "FAILED"]
-
-
-conn_svc_url = "api.gateway.cybershuttle.org"
-
-
 class Runtime(abc.ABC, pydantic.BaseModel):
 
   id: str
@@ -145,53 +136,42 @@ class Remote(Runtime):
   def execute(self, task: Task) -> None:
     assert task.ref is None
     assert task.agent_ref is None
+    assert {"cluster", "queue_name", "node_count", "cpu_count", 
"walltime"}.issubset(self.args.keys())
+    print(f"[Remote] Creating Experiment: name={task.name}")
 
     from .airavata import AiravataOperator
     av = AiravataOperator(context.access_token)
-    print(f"[Remote] Experiment Created: name={task.name}")
-    assert "cluster" in self.args
-    task.agent_ref = str(uuid.uuid4())
-    launch_state = av.launch_experiment(
-        experiment_name=task.name,
-        app_name=task.app_id,
-        inputs={**task.inputs, "agent_id": task.agent_ref, "server_url": 
conn_svc_url},
-        computation_resource_name=str(self.args["cluster"]),
-        queue_name=str(self.args["queue_name"]),
-        node_count=int(self.args["node_count"]),
-        cpu_count=int(self.args["cpu_count"]),
-        walltime=int(self.args["walltime"]),
-    )
-    task.ref = launch_state.experiment_id
-    task.workdir = launch_state.experiment_dir
-    task.sr_host = launch_state.sr_host
-    print(f"[Remote] Experiment Launched: id={task.ref}")
+    try:
+      launch_state = av.launch_experiment(
+          experiment_name=task.name,
+          app_name=task.app_id,
+          inputs=task.inputs,
+          computation_resource_name=str(self.args["cluster"]),
+          queue_name=str(self.args["queue_name"]),
+          node_count=int(self.args["node_count"]),
+          cpu_count=int(self.args["cpu_count"]),
+          walltime=int(self.args["walltime"]),
+      )
+      task.agent_ref = launch_state.agent_ref
+      task.pid = launch_state.process_id
+      task.ref = launch_state.experiment_id
+      task.workdir = launch_state.experiment_dir
+      task.sr_host = launch_state.sr_host
+      print(f"[Remote] Experiment Launched: id={task.ref}")
+    except Exception as e:
+      print(f"[Remote] Failed to launch experiment: {e}")
+      raise e
 
   def execute_py(self, libraries: list[str], code: str, task: Task) -> None:
+    assert task.ref is not None
+    assert task.agent_ref is not None
     print(f"* Packages: {libraries}")
     print(f"* Code:\n{code}")
-    try:
-      res = 
requests.post(f"https://{conn_svc_url}/api/v1/agent/executepythonrequest";, 
json={
-          "libraries": libraries,
-          "code": code,
-          "pythonVersion": "3.10", # TODO verify
-          "keepAlive": False, # TODO verify
-          "parentExperimentId": "/data", # the working directory
-          "agentId": task.agent_ref,
-      })
-      data = res.json()
-      if data["error"] is not None:
-        raise Exception(data["error"])
-      else:
-        exc_id = data["executionId"]
-        while True:
-          res = 
requests.get(f"https://{conn_svc_url}/api/v1/agent/executepythonresponse/{exc_id}";)
-          data = res.json()
-          if data["available"]:
-            response = data["responseString"]
-            return print(response)
-          time.sleep(1)
-    except Exception as e:
-      print(f"\nRemote execution failed! {e}")
+
+    from .airavata import AiravataOperator
+    av = AiravataOperator(context.access_token)
+    result = av.execute_py(libraries, code, task.agent_ref)
+    print(result)
 
   def status(self, task: Task):
     assert task.ref is not None
@@ -205,149 +185,72 @@ class Remote(Runtime):
   def signal(self, signal: str, task: Task) -> None:
     assert task.ref is not None
     assert task.agent_ref is not None
-
+    
     from .airavata import AiravataOperator
     av = AiravataOperator(context.access_token)
     av.stop_experiment(task.ref)
 
   def ls(self, task: Task) -> list[str]:
     assert task.ref is not None
+    assert task.pid is not None
     assert task.agent_ref is not None
     assert task.sr_host is not None
     assert task.workdir is not None
 
     from .airavata import AiravataOperator
     av = AiravataOperator(context.access_token)
-
-    res = 
requests.post(f"https://{conn_svc_url}/api/v1/agent/executecommandrequest";, 
json={
-        "agentId": task.agent_ref,
-        "workingDir": ".",
-        "arguments": ["ls", "/data"]
-    })
-    data = res.json()
-    if data["error"] is not None:
-      if str(data["error"]) == "Agent not found":
-        return av.list_files(task.sr_host, task.workdir)
-      else:
-        raise Exception(data["error"])
-    else:
-      exc_id = data["executionId"]
-      while True:
-        res = 
requests.get(f"https://{conn_svc_url}/api/v1/agent/executecommandresponse/{exc_id}";)
-        data = res.json()
-        if data["available"]:
-          files = data["responseString"].split("\n")
-          return files
-        time.sleep(1)
+    files = av.list_files(task.pid, task.agent_ref, task.sr_host, task.workdir)
+    return files
 
   def upload(self, file: Path, task: Task) -> str:
     assert task.ref is not None
+    assert task.pid is not None
     assert task.agent_ref is not None
     assert task.sr_host is not None
     assert task.workdir is not None
 
-    import os
     from .airavata import AiravataOperator
     av = AiravataOperator(context.access_token)
-
-    res = 
requests.post(f"https://{conn_svc_url}/api/v1/agent/executecommandrequest";, 
json={
-        "agentId": task.agent_ref,
-        "workingDir": ".",
-        "arguments": ["cat", os.path.join("/data", file)]
-    })
-    data = res.json()
-    if data["error"] is not None:
-      if str(data["error"]) == "Agent not found":
-        return av.upload_files(task.sr_host, [file], task.workdir).pop()
-      else:
-        raise Exception(data["error"])
-    else:
-      exc_id = data["executionId"]
-      while True:
-        res = 
requests.get(f"https://{conn_svc_url}/api/v1/agent/executecommandresponse/{exc_id}";)
-        data = res.json()
-        if data["available"]:
-          files = data["responseString"]
-          return files
-        time.sleep(1)
+    result = av.upload_files(task.pid, task.agent_ref, task.sr_host, [file], 
task.workdir).pop()
+    return result
 
   def download(self, file: str, local_dir: str, task: Task) -> str:
     assert task.ref is not None
+    assert task.pid is not None
     assert task.agent_ref is not None
     assert task.sr_host is not None
     assert task.workdir is not None
 
-    import os
     from .airavata import AiravataOperator
     av = AiravataOperator(context.access_token)
-
-    res = 
requests.post(f"https://{conn_svc_url}/api/v1/agent/executecommandrequest";, 
json={
-        "agentId": task.agent_ref,
-        "workingDir": ".",
-        "arguments": ["cat", os.path.join("/data", file)]
-    })
-    data = res.json()
-    if data["error"] is not None:
-      if str(data["error"]) == "Agent not found":
-        return av.download_file(task.sr_host, os.path.join(task.workdir, 
file), local_dir)
-      else:
-        raise Exception(data["error"])
-    else:
-      exc_id = data["executionId"]
-      while True:
-        res = 
requests.get(f"https://{conn_svc_url}/api/v1/agent/executecommandresponse/{exc_id}";)
-        data = res.json()
-        if data["available"]:
-          content = data["responseString"]
-          path = Path(local_dir) / Path(file).name
-          with open(path, "w") as f:
-            f.write(content)
-          return path.as_posix()
-        time.sleep(1)
+    result = av.download_file(task.pid, task.agent_ref, task.sr_host, file, 
task.workdir, local_dir)
+    return result
 
   def cat(self, file: str, task: Task) -> bytes:
     assert task.ref is not None
+    assert task.pid is not None
     assert task.agent_ref is not None
     assert task.sr_host is not None
     assert task.workdir is not None
 
-    import os
     from .airavata import AiravataOperator
     av = AiravataOperator(context.access_token)
-
-    res = 
requests.post(f"https://{conn_svc_url}/api/v1/agent/executecommandrequest";, 
json={
-        "agentId": task.agent_ref,
-        "workingDir": ".",
-        "arguments": ["cat", os.path.join("/data", file)]
-    })
-    data = res.json()
-    if data["error"] is not None:
-      if str(data["error"]) == "Agent not found":
-        return av.cat_file(task.sr_host, os.path.join(task.workdir, file))
-      else:
-        raise Exception(data["error"])
-    else:
-      exc_id = data["executionId"]
-      while True:
-        res = 
requests.get(f"https://{conn_svc_url}/api/v1/agent/executecommandresponse/{exc_id}";)
-        data = res.json()
-        if data["available"]:
-          content = str(data["responseString"]).encode()
-          return content
-        time.sleep(1)
+    content = av.cat_file(task.pid, task.agent_ref, task.sr_host, file, 
task.workdir)
+    return content
 
   @staticmethod
   def default():
-    return Remote(cluster="login.expanse.sdsc.edu", category="gpu", 
queue_name="gpu-shared", node_count=1, cpu_count=24, walltime=30)
-    
+    return list_runtimes(cluster="login.expanse.sdsc.edu", 
category="gpu").pop()
+
 
 def list_runtimes(
     cluster: str | None = None,
     category: str | None = None,
 ) -> list[Runtime]:
-  all_runtimes = list[Runtime]([
-    Remote(cluster="login.expanse.sdsc.edu", category="gpu", 
queue_name="gpu-shared", node_count=1, cpu_count=10, walltime=30),
-    Remote(cluster="login.expanse.sdsc.edu", category="cpu", 
queue_name="shared", node_count=1, cpu_count=10, walltime=30),
-    Remote(cluster="anvil.rcac.purdue.edu", category="cpu", 
queue_name="shared", node_count=1, cpu_count=24, walltime=30),
-  ])
-  return [*filter(lambda r: (cluster in [None, r.args["cluster"]]) and 
(category in [None, r.args["category"]]), all_runtimes)]
\ No newline at end of file
+  from .airavata import AiravataOperator
+  av = AiravataOperator(context.access_token)
+  all_runtimes = av.get_available_runtimes()
+  return [*filter(lambda r: (cluster in [None, r.args["cluster"]]) and 
(category in [None, r.args["category"]]), all_runtimes)]
+
+def is_terminal_state(x):
+  return x in ["CANCELED", "COMPLETED", "FAILED"]
\ No newline at end of file
diff --git 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/task.py
 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/task.py
index a42ebea4e9..1612f44f74 100644
--- 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/task.py
+++ 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/task.py
@@ -17,6 +17,7 @@ from __future__ import annotations
 from typing import Any
 import pydantic
 from .runtime import Runtime
+from rich.progress import Progress
 
 class Task(pydantic.BaseModel):
 
@@ -25,6 +26,7 @@ class Task(pydantic.BaseModel):
   inputs: dict[str, Any]
   runtime: Runtime
   ref: str | None = pydantic.Field(default=None)
+  pid: str | None = pydantic.Field(default=None)
   agent_ref: str | None = pydantic.Field(default=None)
   workdir: str | None = pydantic.Field(default=None)
   sr_host: str | None = pydantic.Field(default=None)
@@ -70,6 +72,21 @@ class Task(pydantic.BaseModel):
     Path(local_dir).mkdir(parents=True, exist_ok=True)
     return self.runtime.download(file, local_dir, self)
   
+  def download_all(self, local_dir: str) -> list[str]:
+    assert self.ref is not None
+    import os
+    os.makedirs(local_dir, exist_ok=True)
+    fps_task = list[str]()
+    files = self.ls()
+    with Progress() as progress:
+      pbar = progress.add_task(f"Downloading: ...", total=len(files))
+      for remote_fp in self.ls():
+        fp = self.runtime.download(remote_fp, local_dir, self)
+        progress.update(pbar, description=f"Downloading: {remote_fp}", 
advance=1)
+        fps_task.append(fp)
+      progress.update(pbar, description=f"Downloading: DONE", refresh=True)
+    return fps_task
+  
   def cat(self, file: str) -> bytes:
     assert self.ref is not None
     return self.runtime.cat(file, self)
diff --git 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/pyproject.toml 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/pyproject.toml
index 14b3f9653f..c4b9f9cf42 100644
--- a/airavata-api/airavata-client-sdks/airavata-python-sdk/pyproject.toml
+++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/pyproject.toml
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
 
 [project]
 name = "airavata-python-sdk-test"
-version = "0.0.6.post3"
+version = "0.0.10.post1"
 description = "Apache Airavata Python SDK"
 readme = "README.md"
 license = { text = "Apache License 2.0" }
@@ -14,7 +14,7 @@ dependencies = [
   "oauthlib",
   "requests",
   "requests-oauthlib",
-  "thrift~=0.21.0",
+  "thrift",
   "thrift_connector",
   "paramiko",
   "scp",
diff --git 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/samples/poc.ipynb 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/samples/poc.ipynb
deleted file mode 100644
index 8b1fc79325..0000000000
--- a/airavata-api/airavata-client-sdks/airavata-python-sdk/samples/poc.ipynb
+++ /dev/null
@@ -1,384 +0,0 @@
-{
- "cells": [
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "# Cybershuttle SDK -  Molecular Dynamics\n",
-    "> Define, run, monitor, and analyze molecular dynamics experiments in a 
HPC-agnostic way.\n",
-    "\n",
-    "This notebook shows how users can setup and launch a **NAMD** experiment 
with replicas, monitor its execution, and run analyses both during and after 
execution."
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Installing Required Packages\n",
-    "\n",
-    "First, install the `airavata-python-sdk-test` package from the pip 
repository."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "%pip install -e airavata-api/airavata-client-sdks/airavata-python-sdk"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Importing the SDK"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "%cd airavata-api/airavata-client-sdks/airavata-python-sdk/samples\n",
-    "import airavata_experiments as ae\n",
-    "from airavata_experiments import md"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Authenticating\n",
-    "\n",
-    "To authenticate for remote execution, call the `ae.login()` method.\n",
-    "This method will prompt you to enter your credentials and authenticate 
your session."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "ae.login()"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "Once authenticated, the `ae.list_runtimes()` function can be called to 
list HPC resources that the user has access to."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "runtimes = ae.list_runtimes()\n",
-    "ae.display(runtimes)"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Uploading Experiment Files\n",
-    "\n",
-    "Drag and drop experiment files onto the workspace that this notebook is 
run on.\n",
-    "\n",
-    "```bash\n",
-    "(sh) $: tree data\n",
-    "data\n",
-    "├── b4pull.pdb\n",
-    "├── b4pull.restart.coor\n",
-    "├── b4pull.restart.vel\n",
-    "├── b4pull.restart.xsc\n",
-    "├── par_all36_water.prm\n",
-    "├── par_all36m_prot.prm\n",
-    "├── pull.conf\n",
-    "├── structure.pdb\n",
-    "└── structure.psf\n",
-    "\n",
-    "1 directory, 9 files\n",
-    "\n",
-    "```"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Defining a NAMD Experiment\n",
-    "\n",
-    "The `md.NAMD.initialize()` is used to define a NAMD experiment.\n",
-    "Here, provide the paths to the `.conf` file, the `.pdb` file, the `.psf` 
file, any optional files you want to run NAMD on.\n",
-    "You can preview the function definition through auto-completion.\n",
-    "\n",
-    "```python\n",
-    "def initialize(\n",
-    "    name: str,\n",
-    "    config_file: str,\n",
-    "    pdb_file: str,\n",
-    "    psf_file: str,\n",
-    "    ffp_files: list[str],\n",
-    "    other_files: list[str] = [],\n",
-    "    parallelism: Literal['CPU', 'GPU'] = \"CPU\",\n",
-    "    num_replicas: int = 1\n",
-    ") -> Experiment[ExperimentApp]\n",
-    "```\n",
-    "\n",
-    "To add replica runs, simply call the `exp.add_replica()` function.\n",
-    "You can call the `add_replica()` function as many times as you want 
replicas.\n",
-    "Any optional resource constraint can be provided here.\n",
-    "\n",
-    "You can also call `ae.display()` to pretty-print the experiment."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "exp = md.NAMD.initialize(\n",
-    "    name=\"yasith_namd_experiment\",\n",
-    "    config_file=\"data/pull.conf\",\n",
-    "    pdb_file=\"data/structure.pdb\",\n",
-    "    psf_file=\"data/structure.psf\",\n",
-    "    ffp_files=[\n",
-    "      \"data/par_all36_water.prm\",\n",
-    "      \"data/par_all36m_prot.prm\"\n",
-    "    ],\n",
-    "    other_files=[\n",
-    "      \"data/b4pull.pdb\",\n",
-    "      \"data/b4pull.restart.coor\",\n",
-    "      \"data/b4pull.restart.vel\",\n",
-    "      \"data/b4pull.restart.xsc\",\n",
-    "    ],\n",
-    "    parallelism=\"GPU\",\n",
-    ")\n",
-    "exp.add_replica(*ae.list_runtimes(cluster=\"login.expanse.sdsc.edu\", 
category=\"gpu\"))\n",
-    "ae.display(exp)"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Creating an Execution Plan\n",
-    "\n",
-    "Call the `exp.plan()` function to transform the experiment definition + 
replicas into a stateful execution plan."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "plan = exp.plan()\n",
-    "ae.display(plan)"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Saving the Plan\n",
-    "\n",
-    "A created plan can be saved locally (in JSON) or remotely (in a 
user-local DB) for later reference."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "plan.save()  # this will save the plan in DB\n",
-    "plan.save_json(\"plan.json\")  # save the plan state locally"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Launching the Plan\n",
-    "\n",
-    "A created plan can be launched using the `plan.launch()` function.\n",
-    "Changes to plan states will be automatically saved onto the remote.\n",
-    "However, plan state can also be tracked locally by invoking 
`plan.save_json()`."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "plan.launch()\n",
-    "plan.save_json(\"plan.json\")"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Loading a Saved Plan\n",
-    "\n",
-    "A saved plan can be loaded by calling `ae.plan.load_json(plan_path)` (for 
local plans) or `ae.plan.load(plan_id)` (for remote plans)."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "plan = ae.plan.load_json(\"plan.json\")\n",
-    "plan = ae.plan.load(plan.id)\n",
-    "ae.display(plan)"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Fetching User-Defined Plans\n",
-    "\n",
-    "The `ae.plan.query()` function retrieves all plans stored in the remote."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "plans = ae.plan.query()\n",
-    "ae.display(plans)"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Checking Plan Status\n",
-    "\n",
-    "The plan's execution status can be checked by calling `plan.status()`."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "plan.status()"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Managing Plan Execution\n",
-    "\n",
-    "The `plan.stop()` function will stop a currently executing plan.\n",
-    "The `plan.join()` function would block code execution until the plan 
completes its execution."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "plan.stop()\n",
-    "plan.join()"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Running File Operations"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "Displaying the status and files generated by each replica (task)"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "for task in plan.tasks:\n",
-    "    status = task.status()\n",
-    "    print(status)\n",
-    "    # task.upload(\"data/sample.txt\")\n",
-    "    files = task.ls()\n",
-    "    display(files)\n",
-    "    display(task.cat(\"NAMD.stderr\"))\n",
-    "    # task.download(\"NAMD.stdout\", \"./results\")\n",
-    "    task.download(\"NAMD_Repl_1.out\", \"./results\")"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "Displaying the intermediate results generated by each replica (task)"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "for index, task in enumerate(plan.tasks):\n",
-    "\n",
-    "    @task.context(packages=[\"matplotlib\", \"pandas\"])\n",
-    "    def analyze(x, y, index, num_tasks) -> None:\n",
-    "        from matplotlib import pyplot as plt\n",
-    "        import pandas as pd\n",
-    "        df = pd.read_csv(\"data.csv\")\n",
-    "        plt.figure(figsize=(x, y))\n",
-    "        plt.plot(df[\"x\"], df[\"y\"], marker=\"o\", linestyle=\"-\", 
linewidth=2, markersize=6)\n",
-    "        plt.title(f\"Plot for Replica {index} of {num_tasks}\")\n",
-    "\n",
-    "    analyze(3, 4, index+1, len(plan.tasks))"
-   ]
-  }
- ],
- "metadata": {
-  "kernelspec": {
-   "display_name": "airavata",
-   "language": "python",
-   "name": "python3"
-  },
-  "language_info": {
-   "codemirror_mode": {
-    "name": "ipython",
-    "version": 3
-   },
-   "file_extension": ".py",
-   "mimetype": "text/x-python",
-   "name": "python",
-   "nbconvert_exporter": "python",
-   "pygments_lexer": "ipython3",
-   "version": "3.12.7"
-  }
- },
- "nbformat": 4,
- "nbformat_minor": 2
-}
diff --git a/dev-tools/deployment/scripts/.gitkeep 
b/dev-tools/deployment/scripts/.gitkeep
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/dev-tools/deployment/scripts/expanse/alphafold2-agent.sh 
b/dev-tools/deployment/scripts/expanse/alphafold2-agent.sh
new file mode 100755
index 0000000000..d752b6baa8
--- /dev/null
+++ b/dev-tools/deployment/scripts/expanse/alphafold2-agent.sh
@@ -0,0 +1,133 @@
+#!/bin/bash -x
+
+# #####################################################################
+# AlphaFold2 Driver + Airavata Agent for Expanse
+# #####################################################################
+#
+# ----------------------------------------------------------------------
+# CONTRIBUTORS
+# ----------------------------------------------------------------------
+# * Sudhakar Pamidigantham
+# * Lahiru Jayathilake
+# * Dimuthu Wannipurage
+# * Yasith Jayawardana
+#
+# ######################################################################
+
+########################################################################
+# Part 1 - Housekeeping
+########################################################################
+
+#-----------------------------------------------------------------------
+# Step 1.1 - Check command line
+#-----------------------------------------------------------------------
+
+while getopts t:p:m: option; do
+  case $option in
+  t) MaxDate=$OPTARG ;;
+  p) MODEL_PRESET=$OPTARG ;;
+  m) Num_Multi=$OPTARG ;;
+  \?) cat <<ENDCAT ;;
+>! Usage: $0  [-t Maximum Template Date ]    !<
+>!            [-p  Model Preset ]      !<
+>!            [-m  Number of Multimers per Model ]      !<
+ENDCAT
+    #   exit 1 ;;
+  esac
+done
+
+if [ $Num_Multi = "" ]; then
+  export Num_Multi=1
+fi
+#set the environment PATH
+export PYTHONNOUSERSITE=True
+module reset
+module load singularitypro
+ALPHAFOLD_DATA_PATH=/expanse/projects/qstore/data/alphafold-v2.3.2
+ALPHAFOLD_MODELS=/expanse/projects/qstore/data/alphafold-v2.3.2/params
+
+#ALPHAFOLD_DATA_PATH=/expanse/projects/qstore/data/alphafold
+#ALPHAFOLD_MODELS=/expanse/projects/qstore/data/alphafold/params
+pdb70=""
+uniprot=""
+pdbseqres=""
+nummulti=""
+
+# check_flags
+if [ "monomer" = "${MODEL_PRESET%_*}" ]; then
+  export pdb70="--pdb70_database_path=/data/pdb70/pdb70"
+else
+  export uniprot="--uniprot_database_path=/data/uniprot/uniprot.fasta"
+  export pdbseqres="--pdb_seqres_database_path=/data/pdb_seqres/pdb_seqres.txt"
+  export nummulti="--num_multimer_predictions_per_model=$Num_Multi"
+fi
+
+## Copy input to node local scratch
+cp input.fasta /scratch/$USER/job_$SLURM_JOBID
+#cp -r /expanse/projects/qstore/data/alphafold/uniclust30/uniclust30_2018_08 
/scratch/$USER/job_$SLURM_JOBID/
+cd /scratch/$USER/job_$SLURM_JOBID
+ln -s /expanse/projects/qstore/data/alphafold/uniclust30/uniclust30_2018_08
+mkdir bfd
+cp /expanse/projects/qstore/data/alphafold/bfd/*index bfd/
+#cp 
/expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_hhm.ffdata
 bfd/
+#cp 
/expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_cs219.ffdata
 bfd/
+cd bfd
+ln -s 
/expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_hhm.ffdata
+ln -s 
/expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_cs219.ffdata
+ln -s 
/expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_a3m.ffdata
+cd ../
+mkdir alphafold_output
+# Create soft links ro rundir form submitdir
+
+ln -s /scratch/$USER/job_$SLURM_JOBID $SLURM_SUBMIT_DIR/rundir
+
+#Run the command
+singularity run --nv \
+  -B /expanse/lustre \
+  -B /expanse/projects \
+  -B /scratch \
+  -B $ALPHAFOLD_DATA_PATH:/data \
+  -B $ALPHAFOLD_MODELS \
+  /cm/shared/apps/containers/singularity/alphafold/alphafold_aria2_v2.3.2.simg 
\
+  --fasta_paths=/scratch/$USER/job_$SLURM_JOBID/input.fasta \
+  --uniref90_database_path=/data/uniref90/uniref90.fasta \
+  --data_dir=/data \
+  --mgnify_database_path=/data/mgnify/mgy_clusters_2022_05.fa \
+  
--bfd_database_path=/scratch/$USER/job_$SLURM_JOBID/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt
 \
+  --uniref30_database_path=/data/uniref30/UniRef30_2021_03 \
+  $pdbseqres \
+  $pdb70 \
+  $uniprot \
+  --template_mmcif_dir=/data/pdb_mmcif/mmcif_files \
+  --obsolete_pdbs_path=/data/pdb_mmcif/obsolete.dat \
+  --output_dir=/scratch/$USER/job_$SLURM_JOBID/alphafold_output \
+  --max_template_date=$MaxDate \
+  --model_preset=$MODEL_PRESET \
+  --use_gpu_relax=true \
+  --models_to_relax=best \
+  $nummulti
+
+#-B .:/etc \
+#/cm/shared/apps/containers/singularity/alphafold/alphafold.sif  \
+#--fasta_paths=input.fasta  \
+#--uniref90_database_path=/data/uniref90/uniref90.fasta  \
+#--data_dir=/data \
+#--mgnify_database_path=/data/mgnify/mgy_clusters.fa   \
+#--bfd_database_path=/scratch/$USER/job_$SLURM_JOBID/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt
 \
+#--uniclust30_database_path=/scratch/$USER/job_$SLURM_JOBID/uniclust30_2018_08/uniclust30_2018_08
 \
+#--pdb70_database_path=/data/pdb70/pdb70  \
+#--template_mmcif_dir=/data/pdb_mmcif/mmcif_files  \
+#--obsolete_pdbs_path=/data/pdb_mmcif/obsolete.dat \
+#--output_dir=alphafold_output  \
+#--max_template_date=$MaxDate \
+#--preset=$MODEL_PRESET
+
+#make a user choice --preset=casp14
+#make this a user choice  --max_template_date=2020-05-14   \
+# --model_names='model_1' \
+# Remove model data
+unlink $SLURM_SUBMIT_DIR/rundir
+
+### Copy back results
+
+tar -cvf $SLURM_SUBMIT_DIR/alphafold_output.tar alphafold_output
diff --git a/dev-tools/deployment/scripts/expanse/namd-agent.sh 
b/dev-tools/deployment/scripts/expanse/namd-agent.sh
new file mode 100755
index 0000000000..b2a7301822
--- /dev/null
+++ b/dev-tools/deployment/scripts/expanse/namd-agent.sh
@@ -0,0 +1,232 @@
+#!/bin/bash -x
+
+# #####################################################################
+# NAMD Driver  + Airavata Agent for Expanse
+# #####################################################################
+#
+# ----------------------------------------------------------------------
+# CONTRIBUTORS
+# ----------------------------------------------------------------------
+# * Sudhakar Pamidigantham
+# * Diego Gomes
+# * Lahiru Jayathilake
+# * Yasith Jayawardana
+#
+# ----------------------------------------------------------------------
+# CHANGELOG
+# ----------------------------------------------------------------------
+# * 2024/12/13 - Agent subprocess and graceful shutdown (Yasith J)
+# * 2024/12/09 - Reviewed (Diego Gomes)
+# ######################################################################
+
+########################################################################
+# Part 1 - Housekeeping
+########################################################################
+
+#-----------------------------------------------------------------------
+# Step 1.1 - Check command line
+#-----------------------------------------------------------------------
+if [ $# -lt 1 -o $# -gt 15 ]; then
+  echo 1>&2 "Usage: $0 -t [CPU/GPU] -r [PJobID] -l [Continue_Replicas_list] -n 
[Number_of_Replicas] -i input_conf [SEAGrid_UserName] "
+  exit 127
+fi
+
+# subdir depends on whether we're doing freq, water or PES. For freq and water,
+# it should be hardcoded in the Xbaya workflow. For PES, it should be an
+# additional array generated by the frontend. The contents of this array are
+# trivial, but creating an extra Xbaya service to generate it would add
+# unnecessary extra complexity. Besides, the frontend cannot avoid having to
+# pass at least one array: the array with gjf files.
+
+subdir="$PWD"
+while getopts t:r:l:n:i:a:s: option; do
+  case $option in
+  t) ExeTyp=$OPTARG ;;
+  r) PJobID=$OPTARG ;;
+  l) rep_list=$OPTARG ;;
+  n) num_rep=$OPTARG ;;
+  i) input=$OPTARG ;;
+  a) agent_id=$OPTARG ;;
+  s) server_url=$OPTARG ;;
+  \?) cat <<ENDCAT ;;
+>! Usage: $0  [-et execution type cpu/gpu ]    !<
+>!            [-rr  Previous JobID for continuation (optional)]      !<
+>!            [-rl  replica list for contiuation (optional)]     !<
+>!            [-rep Number of replicas  to run  (optional)]     !<
+ENDCAT
+  esac
+done
+
+echo "ExeTyp=$ExeTyp"
+echo "PJobID=$PJobID"
+echo "rep_list=$rep_list"
+echo "num_rep=$num_rep"
+echo "input=$input"
+echo "agent_id=$agent_id"
+echo "server_url=$server_url"
+
+# ----------------------------------------------------------------------
+# RUN AGENT AS SUBPROCESS (for now)
+# ----------------------------------------------------------------------
+SIF_PATH=/home/scigap/agent-framework/airavata-agent.sif
+module load singularitypro
+singularity exec --bind $PWD:/data $SIF_PATH bash -c "/opt/airavata-agent 
$server_url:19900 $agent_id" &
+agent_pid=$! # save agent PID for graceful shutdown
+
+#-----------------------------------------------------------------------
+# Step 1.2 - Validate inputs
+#-----------------------------------------------------------------------
+
+if [ ! $AIRAVATA_USERNAME ]; then
+  echo "Missing AIRAVATA_USERNAME. Check with support!"
+  exit
+fi
+
+if [ ! $ExeTyp ]; then
+  echo "Missing Execution Type: [CPU, GPU]"
+  exit
+fi
+
+SG_UserName="$AIRAVATA_USERNAME"
+echo "Execution Type: $ExeTyp"
+
+#-----------------------------------------------------------------------
+# Step 1.3 - Get the input configuration filename
+#-----------------------------------------------------------------------
+filename=$(basename -- "$input")
+filename="${filename%.*}"
+
+#-----------------------------------------------------------------------
+# Step 1.4 - Copy previous files if this a continuation.
+#-----------------------------------------------------------------------
+if [ "$PJobID" ]; then
+  cp $input saveInput #save configuration
+  ls -lt $localarc/$PJobID/
+  cp -r $localarc/$PJobID/. .
+  cp saveInput $input
+fi
+
+#-----------------------------------------------------------------------
+# Step 1.5 - Create folders for replicas (if necessary)
+#-----------------------------------------------------------------------
+echo " Creating folders for replica run(s)"
+input_files=$(ls *.* | grep -v slurm)
+
+# Create one subdirectory per replica and copy over the inputs
+for i in $(seq 1 ${num_rep}); do
+  if [ ! -d ${i} ]; then
+    mkdir ${i}
+    cp $input_files ${i}/
+  fi
+done
+
+########################################################################
+# Part 2 - Machine specific Options (SDSC-Expanse)
+########################################################################
+
+#-----------------------------------------------------------------------
+# Step 2.1 - Load modules (SDSC-Expanse)
+#-----------------------------------------------------------------------
+
+module purge
+module load slurm/expanse/current
+
+if [ $ExeTyp = "CPU" ]; then
+  echo "Loading CPU modules"
+  module load cpu/0.17.3b gcc/10.2.0 openmpi/4.1.1
+fi
+if [ $ExeTyp = "GPU" ]; then
+  echo "Loading GPU modules"
+  module load gpu/0.17.3b
+fi
+
+module list
+
+#-----------------------------------------------------------------------
+# Step 2.2 - Set NAMD binary and command line for SDSC-Expanse
+#-----------------------------------------------------------------------
+APP_PATH=/home/scigap/applications
+if [ $ExeTyp == "CPU" ]; then
+  export NAMDPATH="$APP_PATH/NAMD_3.1alpha2_Linux-x86_64-multicore"
+fi
+if [ $ExeTyp == "GPU" ]; then
+  export NAMDPATH="$APP_PATH/NAMD_3.1alpha2_Linux-x86_64-multicore-CUDA"
+fi
+
+#-----------------------------------------------------------------------
+# Step 2.3 A - Run NAMD3 (CPU, Serial)
+#-----------------------------------------------------------------------
+# - one replica at a given time
+# - each replica uses all CPUs
+#-----------------------------------------------------------------------
+if [ ${ExeTyp} == "CPU" ]; then
+  for replica in $(seq 1 ${num_rep}); do
+    cd ${subdir}/${replica}/ # Go to folder
+
+    # Run NAMD3
+    ${NAMDPATH}/namd3 \
+      +setcpuaffinity \
+      +p ${SLURM_CPUS_ON_NODE} \
+      $input >${filename}.out 2>${filename}.err
+  done
+fi
+
+#-----------------------------------------------------------------------
+# Step 2.3 B - Run NAMD3 (GPU, Batched)
+#-----------------------------------------------------------------------
+# - one replica PER GPU at a given time
+# - each replica uses all CPUs
+#-----------------------------------------------------------------------
+if [ ${ExeTyp} == "GPU" ]; then
+  GPU_ID=0
+  subtask_pids=()
+  for replica in $(seq 1 ${num_rep}); do
+    cd ${subdir}/${replica}/ # Go to folder
+
+    # Run NAMD3 in background
+    ${NAMDPATH}/namd3 \
+      +setcpuaffinity \
+      +p ${SLURM_CPUS_ON_NODE} \
+      +devices ${GPU_ID} \
+      $input >${filename}.out 2>${filename}.err &
+
+    subtask_pids+=($!) # Store PID of the background NAMD task
+    let GPU_ID+=1      # Increment GPU_ID
+    
+    # Wait for a batch of replicas to complete
+    if [ ${GPU_ID} == ${SLURM_GPUS_ON_NODE} ]; then
+      wait "${subtask_pids[@]}" # wait for current batch to complete
+      subtask_pids=()           # clear subtask_pids of current batch
+      GPU_ID=0                  # reset gpu counter
+    fi
+  done
+  
+  # Wait for the last batch of replicas to complete
+  if [ ${#subtask_pids[@]} -gt 0 ]; then
+    wait "${subtask_pids[@]}"   # wait for last batch to complete
+    subtask_pids=()             # clear subtask_pids of last batch
+  fi
+fi
+
+# Once done, go back to main folder
+cd ${subdir}
+
+########################################################################
+# Part 3 - Output Flattening
+########################################################################
+num_rep=3
+for replica in $(seq 1 ${num_rep}); do
+  for file in $(ls ${replica}/*.*); do
+    mv ${file} ${replica}"_"$(basename $file)
+  done
+  rm -rf ${replica}/
+done
+
+# Send SIGTERM to agent, and wait for completion
+kill -TERM $agent_pid
+wait $agent_pid
+
+# Give it a break when jobs are done
+sleep 10
+
+# bye!
diff --git 
a/modules/agent-framework/deployments/jupyterhub/data/1_experiment_sdk.ipynb 
b/modules/agent-framework/deployments/jupyterhub/data/1_experiment_sdk.ipynb
index 1e5dba8f50..f89b022ecd 100644
--- a/modules/agent-framework/deployments/jupyterhub/data/1_experiment_sdk.ipynb
+++ b/modules/agent-framework/deployments/jupyterhub/data/1_experiment_sdk.ipynb
@@ -176,9 +176,10 @@
     "      \"data/b4pull.restart.vel\",\n",
     "      \"data/b4pull.restart.xsc\",\n",
     "    ],\n",
-    "    parallelism=\"GPU\",\n",
+    "    parallelism=\"CPU\",\n",
+    "    num_replicas=1,\n",
     ")\n",
-    "exp.add_replica()\n",
+    "exp.add_replica(*ae.list_runtimes(cluster=\"login.expanse.sdsc.edu\", 
category=\"cpu\"))\n",
     "ae.display(exp)"
    ]
   },
@@ -271,6 +272,7 @@
    "source": [
     "plan = ae.plan.load_json(\"plan.json\")\n",
     "plan = ae.plan.load(plan.id)\n",
+    "plan.status()\n",
     "ae.display(plan)"
    ]
   },
@@ -294,24 +296,6 @@
     "ae.display(plans)"
    ]
   },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Checking Plan Status\n",
-    "\n",
-    "The plan's execution status can be checked by calling `plan.status()`."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "plan.status()"
-   ]
-  },
   {
    "cell_type": "markdown",
    "metadata": {},
@@ -319,7 +303,7 @@
     "## Managing Plan Execution\n",
     "\n",
     "The `plan.stop()` function will stop a currently executing plan.\n",
-    "The `plan.join()` function would block code execution until the plan 
completes its execution."
+    "The `plan.wait_for_completion()` function would block until the plan 
finishes executing."
    ]
   },
   {
@@ -328,8 +312,8 @@
    "metadata": {},
    "outputs": [],
    "source": [
-    "plan.stop()\n",
-    "plan.join()"
+    "# plan.stop()\n",
+    "plan.wait_for_completion()"
    ]
   },
   {
@@ -361,14 +345,17 @@
    "outputs": [],
    "source": [
     "for task in plan.tasks:\n",
-    "    status = task.status()\n",
-    "    print(status)\n",
-    "    # task.upload(\"data/sample.txt\")\n",
-    "    files = task.ls()\n",
-    "    display(files)\n",
-    "    display(task.cat(\"NAMD.stderr\"))\n",
-    "    # task.download(\"NAMD.stdout\", \"./results\")\n",
-    "    task.download(\"NAMD_Repl_1.out\", \"./results\")"
+    "    print(task.name, task.pid)\n",
+    "    # display files\n",
+    "    display(task.ls())\n",
+    "    # upload a file\n",
+    "    task.upload(\"data/sample.txt\")\n",
+    "    # preview contents of a file\n",
+    "    display(task.cat(\"sample.txt\"))\n",
+    "    # download a specific file\n",
+    "    task.download(\"sample.txt\", f\"./results_{task.name}\")\n",
+    "    # download all files\n",
+    "    task.download_all(f\"./results_{task.name}\")"
    ]
   },
   {
@@ -390,17 +377,14 @@
    "outputs": [],
    "source": [
     "for index, task in enumerate(plan.tasks):\n",
-    "\n",
-    "    @task.context(packages=[\"matplotlib\", \"pandas\"])\n",
-    "    def analyze(x, y, index, num_tasks) -> None:\n",
-    "        from matplotlib import pyplot as plt\n",
-    "        import pandas as pd\n",
-    "        df = pd.read_csv(\"data.csv\")\n",
-    "        plt.figure(figsize=(x, y))\n",
-    "        plt.plot(df[\"x\"], df[\"y\"], marker=\"o\", linestyle=\"-\", 
linewidth=2, markersize=6)\n",
-    "        plt.title(f\"Plot for Replica {index} of {num_tasks}\")\n",
-    "\n",
-    "    analyze(3, 4, index+1, len(plan.tasks))"
+    "    @task.context(packages=[\"numpy\", \"pandas\"])\n",
+    "    def analyze() -> None:\n",
+    "        import numpy as np\n",
+    "        with open(\"pull.conf\", \"r\") as f:\n",
+    "            data = f.read()\n",
+    "        print(\"pull.conf has\", len(data), \"chars\")\n",
+    "        print(np.arange(10))\n",
+    "    analyze()"
    ]
   }
  ],

Reply via email to