This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 47d882e39604600d425d1159a08d1cdd55ca7623 Author: yasith <[email protected]> AuthorDate: Mon Dec 16 01:00:19 2024 -0600 add pre-submission validator, update storage dir, take CONNECTION_SVC_URL and FILEMGR_SVC_URL from settings.ini, cleanup runtime.py, add fallback apis to airavata.py, rearrange files, setup file ul/dl apis, improve ux, update notebooks, add agent scripts, refine code, fix bugs --- .../airavata-python-sdk/.gitignore | 2 +- .../airavata_experiments/__init__.py | 4 +- .../airavata_experiments/airavata.py | 390 ++++++++++++++++----- .../airavata_experiments/base.py | 6 +- .../airavata_experiments/md/applications.py | 26 +- .../airavata_experiments/plan.py | 35 +- .../airavata_experiments/runtime.py | 199 +++-------- .../airavata_experiments/task.py | 17 + .../airavata-python-sdk/pyproject.toml | 4 +- .../airavata-python-sdk/samples/poc.ipynb | 384 -------------------- dev-tools/deployment/scripts/.gitkeep | 0 .../deployment/scripts/expanse/alphafold2-agent.sh | 133 +++++++ dev-tools/deployment/scripts/expanse/namd-agent.sh | 232 ++++++++++++ .../jupyterhub/data/1_experiment_sdk.ipynb | 68 ++-- 14 files changed, 787 insertions(+), 713 deletions(-) diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/.gitignore b/airavata-api/airavata-client-sdks/airavata-python-sdk/.gitignore index 2fb5c82ffc..794ce1ca9c 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/.gitignore +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/.gitignore @@ -9,7 +9,7 @@ __pycache__/ .ipynb_checkpoints *.egg-info/ data/ -results/ +results*/ plan.json settings*.ini auth.state \ No newline at end of file diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/__init__.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/__init__.py index c6927de016..f45c820c4f 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/__init__.py +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/__init__.py @@ -16,11 +16,11 @@ from __future__ import annotations -from . import base, md, plan +from . import base, plan from .auth import login, logout from .runtime import list_runtimes, Runtime -__all__ = ["login", "logout", "list_runtimes", "base", "md", "plan"] +__all__ = ["login", "logout", "list_runtimes", "base", "plan"] def display_runtimes(runtimes: list[Runtime]): """ diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py index 36c2efbf43..bf778a05df 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py @@ -18,20 +18,29 @@ import logging from pathlib import Path from typing import Literal, NamedTuple from .sftp import SFTPConnector +import time +import warnings +import requests +from urllib.parse import urlparse +import uuid +import os +import base64 import jwt from airavata.model.security.ttypes import AuthzToken from airavata.model.experiment.ttypes import ExperimentModel, ExperimentType, UserConfigurationDataModel from airavata.model.scheduling.ttypes import ComputationalResourceSchedulingModel from airavata.model.data.replica.ttypes import DataProductModel, DataProductType, DataReplicaLocationModel, ReplicaLocationCategory - from airavata_sdk.clients.api_server_client import APIServerClient +warnings.filterwarnings("ignore", category=DeprecationWarning) logger = logging.getLogger("airavata_sdk.clients") logger.setLevel(logging.INFO) LaunchState = NamedTuple("LaunchState", [ ("experiment_id", str), + ("agent_ref", str), + ("process_id", str), ("mount_point", Path), ("experiment_dir", str), ("sr_host", str), @@ -49,6 +58,8 @@ class Settings: self.API_SERVER_HOST = config.get('APIServer', 'API_HOST') self.API_SERVER_PORT = config.getint('APIServer', 'API_PORT') self.API_SERVER_SECURE = config.getboolean('APIServer', 'API_SECURE') + self.CONNECTION_SVC_URL = config.get('APIServer', 'CONNECTION_SVC_URL') + self.FILEMGR_SVC_URL = config.get('APIServer', 'FILEMGR_SVC_URL') # gateway settings self.GATEWAY_ID = config.get('Gateway', 'GATEWAY_ID') @@ -176,6 +187,12 @@ class AiravataOperator: def default_project_name(self): return self.settings.PROJECT_NAME + + def connection_svc_url(self): + return self.settings.CONNECTION_SVC_URL + + def filemgr_svc_url(self): + return self.settings.FILEMGR_SVC_URL def __airavata_token__(self, access_token: str, gateway_id: str): """ @@ -187,14 +204,22 @@ class AiravataOperator: claimsMap = {"userName": self.user_id, "gatewayID": gateway_id} return AuthzToken(accessToken=self.access_token, claimsMap=claimsMap) - def get_experiment(self, experiment_id: str): """ Get experiment by id """ return self.api_server_client.get_experiment(self.airavata_token, experiment_id) + + def get_process_id(self, experiment_id: str) -> str: + """ + Get process id by experiment id + """ + tree: any = self.api_server_client.get_detailed_experiment_tree(self.airavata_token, experiment_id) # type: ignore + processModels: list = tree.processes + assert len(processModels) == 1, f"Expected 1 process model, got {len(processModels)}" + return processModels[0].processId def get_accessible_apps(self, gateway_id: str | None = None): """ @@ -207,7 +232,6 @@ class AiravataOperator: app_interfaces = self.api_server_client.get_all_application_interfaces(self.airavata_token, gateway_id) return app_interfaces - def get_preferred_storage(self, gateway_id: str | None = None, sr_hostname: str | None = None): """ Get preferred storage resource @@ -221,7 +245,6 @@ class AiravataOperator: sr_id = next((str(k) for k, v in sr_names.items() if v == sr_hostname)) return self.api_server_client.get_gateway_storage_preference(self.airavata_token, gateway_id, sr_id) - def get_storage(self, storage_name: str | None = None) -> any: # type: ignore """ Get storage resource by name @@ -234,9 +257,6 @@ class AiravataOperator: sr_id = next((str(k) for k, v in sr_names.items() if v == storage_name)) storage = self.api_server_client.get_storage_resource(self.airavata_token, sr_id) return storage - - - def get_group_resource_profile_id(self, grp_name: str | None = None) -> str: """ @@ -254,7 +274,6 @@ class AiravataOperator: grp: any = self.api_server_client.get_group_resource_profile(self.airavata_token, grp_id) # type: ignore return grp - def get_compatible_deployments(self, app_interface_id: str, grp_name: str | None = None): """ Get compatible deployments for an application interface and group resource profile @@ -268,7 +287,6 @@ class AiravataOperator: deployments = self.api_server_client.get_application_deployments_for_app_module_and_group_resource_profile(self.airavata_token, app_interface_id, grp_id) return deployments - def get_app_interface_id(self, app_name: str, gateway_id: str | None = None): """ Get application interface id by name @@ -278,7 +296,6 @@ class AiravataOperator: apps: list = self.api_server_client.get_all_application_interfaces(self.airavata_token, gateway_id) # type: ignore app_id = next((app.applicationInterfaceId for app in apps if app.applicationName == app_name)) return str(app_id) - def get_project_id(self, project_name: str, gateway_id: str | None = None): gateway_id = str(gateway_id or self.default_gateway_id()) @@ -286,7 +303,6 @@ class AiravataOperator: project_id = next((p.projectID for p in projects if p.name == project_name and p.owner == self.user_id)) return str(project_id) - def get_application_inputs(self, app_interface_id: str) -> list: """ Get application inputs by id @@ -294,7 +310,6 @@ class AiravataOperator: """ return list(self.api_server_client.get_application_inputs(self.airavata_token, app_interface_id)) # type: ignore - def get_compute_resources_by_ids(self, resource_ids: list[str]): """ Get compute resources by ids @@ -302,7 +317,6 @@ class AiravataOperator: """ return [self.api_server_client.get_compute_resource(self.airavata_token, resource_id) for resource_id in resource_ids] - def make_experiment_dir(self, sr_host: str, project_name: str, experiment_name: str) -> str: """ Make experiment directory on storage resource, and return the remote path @@ -317,68 +331,191 @@ class AiravataOperator: logger.info("Experiment directory created at %s", remote_path) return remote_path - - def upload_files(self, sr_host: str, local_files: list[Path], remote_dir: str) -> list[str]: + def upload_files(self, process_id: str | None, agent_ref: str | None, sr_host: str, local_files: list[Path], remote_dir: str) -> list[str]: """ Upload local files to a remote directory of a storage resource + TODO add data_svc fallback Return Path: /{project_name}/{experiment_name} """ - host = sr_host - port = self.default_sftp_port() - sftp_connector = SFTPConnector(host=host, port=int(port), username=self.user_id, password=self.access_token) - paths = sftp_connector.put(local_files, remote_dir) - logger.info(f"{len(paths)} Local files uploaded to remote dir: %s", remote_dir) - return paths + # step = experiment staging + if process_id is None and agent_ref is None: + host = sr_host + port = self.default_sftp_port() + sftp_connector = SFTPConnector(host=host, port=int(port), username=self.user_id, password=self.access_token) + paths = sftp_connector.put(local_files, remote_dir) + logger.info(f"{len(paths)} Local files uploaded to remote dir: %s", remote_dir) + return paths + + # step = post-staging file upload + elif process_id is not None and agent_ref is not None: + assert len(local_files) == 1, f"Expected 1 file, got {len(local_files)}" + file = local_files[0] + fp = os.path.join("/data", file.name) + rawdata = file.read_bytes() + b64data = base64.b64encode(rawdata).decode() + res = requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={ + "agentId": agent_ref, + "workingDir": ".", + "arguments": ["sh", "-c", f"echo {b64data} | base64 -d > {fp}"] + }) + data = res.json() + if data["error"] is not None: + if str(data["error"]) == "Agent not found": + port = self.default_sftp_port() + sftp_connector = SFTPConnector(host=sr_host, port=int(port), username=self.user_id, password=self.access_token) + paths = sftp_connector.put(local_files, remote_dir) + return paths + else: + raise Exception(data["error"]) + else: + exc_id = data["executionId"] + while True: + res = requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}") + data = res.json() + if data["available"]: + return [fp] + time.sleep(1) + + # step = unknown + else: + raise ValueError("Invalid arguments for upload_files") + + # file manager service fallback + assert process_id is not None, f"Expected process_id, got {process_id}" + file = local_files[0] + url_path = os.path.join(process_id, file.name) + filemgr_svc_upload_url = f"{self.filemgr_svc_url()}/upload/live/{url_path}" - def list_files(self, sr_host: str, remote_dir: str) -> list[str]: + def list_files(self, process_id: str, agent_ref: str, sr_host: str, remote_dir: str) -> list[str]: """ List files in a remote directory of a storage resource + TODO add data_svc fallback Return Path: /{project_name}/{experiment_name} """ - host = sr_host - port = self.default_sftp_port() - sftp_connector = SFTPConnector(host=host, port=int(port), username=self.user_id, password=self.access_token) - return sftp_connector.ls(remote_dir) - - - def download_file(self, sr_host: str, remote_file: str, local_dir: str) -> str: + res = requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={ + "agentId": agent_ref, + "workingDir": ".", + "arguments": ["sh", "-c", "cd /data && find . -type f -printf '%P\n'"] + }) + data = res.json() + if data["error"] is not None: + if str(data["error"]) == "Agent not found": + port = self.default_sftp_port() + sftp_connector = SFTPConnector(host=sr_host, port=int(port), username=self.user_id, password=self.access_token) + return sftp_connector.ls(remote_dir) + else: + raise Exception(data["error"]) + else: + exc_id = data["executionId"] + while True: + res = requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}") + data = res.json() + if data["available"]: + files = data["responseString"].split("\n") + return files + time.sleep(1) + + # file manager service fallback + assert process_id is not None, f"Expected process_id, got {process_id}" + filemgr_svc_ls_url = f"{self.filemgr_svc_url()}/list/live/{process_id}" + + def download_file(self, process_id: str, agent_ref: str, sr_host: str, remote_file: str, remote_dir: str, local_dir: str) -> str: """ Download files from a remote directory of a storage resource to a local directory + TODO add data_svc fallback Return Path: /{project_name}/{experiment_name} """ - host = sr_host - port = self.default_sftp_port() - sftp_connector = SFTPConnector(host=host, port=int(port), username=self.user_id, password=self.access_token) - path = sftp_connector.get(remote_file, local_dir) - logger.info("Remote files downlaoded to local dir: %s", local_dir) - return path + import os + fp = os.path.join("/data", remote_file) + res = requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={ + "agentId": agent_ref, + "workingDir": ".", + "arguments": ["sh", "-c", f"cat {fp} | base64 -w0"] + }) + data = res.json() + if data["error"] is not None: + if str(data["error"]) == "Agent not found": + port = self.default_sftp_port() + fp = os.path.join(remote_dir, remote_file) + sftp_connector = SFTPConnector(host=sr_host, port=int(port), username=self.user_id, password=self.access_token) + path = sftp_connector.get(fp, local_dir) + return path + else: + raise Exception(data["error"]) + else: + exc_id = data["executionId"] + while True: + res = requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}") + data = res.json() + if data["available"]: + content = data["responseString"] + import base64 + content = base64.b64decode(content) + path = Path(local_dir) / remote_file + with open(path, "wb") as f: + f.write(content) + return path.as_posix() + time.sleep(1) + + # file manager service fallback + assert process_id is not None, f"Expected process_id, got {process_id}" + url_path = os.path.join(process_id, remote_file) + filemgr_svc_download_url = f"{self.filemgr_svc_url()}/download/live/{url_path}" - def cat_file(self, sr_host: str, remote_file: str) -> bytes: + def cat_file(self, process_id: str, agent_ref: str, sr_host: str, remote_file: str, remote_dir: str) -> bytes: """ Download files from a remote directory of a storage resource to a local directory + TODO add data_svc fallback Return Path: /{project_name}/{experiment_name} """ - host = sr_host - port = self.default_sftp_port() - sftp_connector = SFTPConnector(host=host, port=int(port), username=self.user_id, password=self.access_token) - data = sftp_connector.cat(remote_file) - logger.info("Remote files downlaoded to local dir: %s bytes", len(data)) - return data + import os + fp = os.path.join("/data", remote_file) + res = requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={ + "agentId": agent_ref, + "workingDir": ".", + "arguments": ["sh", "-c", f"cat {fp} | base64 -w0"] + }) + data = res.json() + if data["error"] is not None: + if str(data["error"]) == "Agent not found": + port = self.default_sftp_port() + fp = os.path.join(remote_dir, remote_file) + sftp_connector = SFTPConnector(host=sr_host, port=int(port), username=self.user_id, password=self.access_token) + data = sftp_connector.cat(fp) + return data + else: + raise Exception(data["error"]) + else: + exc_id = data["executionId"] + while True: + res = requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}") + data = res.json() + if data["available"]: + content = data["responseString"] + import base64 + content = base64.b64decode(content) + return content + time.sleep(1) + + # file manager service fallback + assert process_id is not None, f"Expected process_id, got {process_id}" + url_path = os.path.join(process_id, remote_file) + filemgr_svc_download_url = f"{self.filemgr_svc_url()}/download/live/{url_path}" def launch_experiment( self, experiment_name: str, app_name: str, - inputs: dict[str, str | int | float | list[str]], + inputs: dict[str, dict[str, str | int | float | list[str]]], computation_resource_name: str, queue_name: str, node_count: int, @@ -402,24 +539,43 @@ class AiravataOperator: sr_host = str(sr_host or self.default_sr_hostname()) mount_point = Path(self.default_gateway_data_store_dir()) / self.user_id project_name = str(project_name or self.default_project_name()) + agent_ref = str(uuid.uuid4()) + server_url = urlparse(self.connection_svc_url()).netloc # validate args (str) print("[AV] Validating args...") - assert len(experiment_name) > 0 - assert len(app_name) > 0 - assert len(computation_resource_name) > 0 - assert len(inputs) > 0 - assert len(gateway_id) > 0 - assert len(queue_name) > 0 - assert len(grp_name) > 0 - assert len(sr_host) > 0 - assert len(project_name) > 0 - assert len(mount_point.as_posix()) > 0 + assert len(experiment_name) > 0, f"Invalid experiment_name: {experiment_name}" + assert len(app_name) > 0, f"Invalid app_name: {app_name}" + assert len(computation_resource_name) > 0, f"Invalid computation_resource_name: {computation_resource_name}" + assert len(inputs) > 0, f"Invalid inputs: {inputs}" + assert len(gateway_id) > 0, f"Invalid gateway_id: {gateway_id}" + assert len(queue_name) > 0, f"Invalid queue_name: {queue_name}" + assert len(grp_name) > 0, f"Invalid grp_name: {grp_name}" + assert len(sr_host) > 0, f"Invalid sr_host: {sr_host}" + assert len(project_name) > 0, f"Invalid project_name: {project_name}" + assert len(mount_point.as_posix()) > 0, f"Invalid mount_point: {mount_point}" # validate args (int) - assert node_count > 0 - assert cpu_count > 0 - assert walltime > 0 + assert node_count > 0, f"Invalid node_count: {node_count}" + assert cpu_count > 0, f"Invalid cpu_count: {cpu_count}" + assert walltime > 0, f"Invalid walltime: {walltime}" + + # parse and validate inputs + file_inputs = dict[str, Path | list[Path]]() + data_inputs = dict[str, str | int | float]() + for input_name, input_spec in inputs.items(): + input_type = input_spec["type"] + input_value = input_spec["value"] + if input_type == "uri": + assert isinstance(input_value, str) and os.path.isfile(str(input_value)), f"Invalid {input_name}: {input_value}" + file_inputs[input_name] = Path(input_value) + elif input_type == "uri[]": + assert isinstance(input_value, list) and all([os.path.isfile(str(v)) for v in input_value]), f"Invalid {input_name}: {input_value}" + file_inputs[input_name] = [Path(v) for v in input_value] + else: + assert isinstance(input_value, (int, float, str)), f"Invalid {input_name}: {input_value}" + data_inputs[input_name] = input_value + data_inputs.update({"agent_id": agent_ref, "server_url": server_url}) # setup runtime params print("[AV] Setting up runtime params...") @@ -429,7 +585,7 @@ class AiravataOperator: # setup application interface print("[AV] Setting up application interface...") app_interface_id = self.get_app_interface_id(app_name) - assert app_interface_id is not None + assert app_interface_id is not None, f"Invalid app_interface_id: {app_interface_id}" # setup experiment print("[AV] Setting up experiment...") @@ -464,45 +620,37 @@ class AiravataOperator: auto_schedule=auto_schedule, ) - # set up file inputs - print("[AV] Setting up file inputs...") - def register_input_file(file: Path) -> str: return str(self.register_input_file(file.name, sr_host, sr_id, gateway_id, file.name, abs_path)) - - # setup experiment inputs + + # set up file inputs + print("[AV] Setting up file inputs...") files_to_upload = list[Path]() - file_inputs = dict[str, str | list[str]]() - data_inputs = dict[str, str | list[str] | int | float]() - for key, value in inputs.items(): - - if isinstance(value, str) and Path(value).is_file(): - file = Path(value) - files_to_upload.append(file) - file_inputs[key] = register_input_file(file) - - elif isinstance(value, list) and all([isinstance(v, str) and Path(v).is_file() for v in value]): - files = [*map(Path, value)] - files_to_upload.extend(files) - file_inputs[key] = [*map(register_input_file, files)] - + file_refs = dict[str, str | list[str]]() + for key, value in file_inputs.items(): + if isinstance(value, Path): + files_to_upload.append(value) + file_refs[key] = register_input_file(value) + elif isinstance(value, list): + assert all([isinstance(v, Path) for v in value]), f"Invalid file input value: {value}" + files_to_upload.extend(value) + file_refs[key] = [*map(register_input_file, value)] else: - data_inputs[key] = value - - # configure file inputs for experiment - print(f"[AV] Uploading {len(files_to_upload)} file inputs for experiment...") - self.upload_files(storage.hostName, files_to_upload, exp_dir) + raise ValueError("Invalid file input type") # configure experiment inputs experiment_inputs = [] for exp_input in self.api_server_client.get_application_inputs(self.airavata_token, app_interface_id): # type: ignore if exp_input.type < 3 and exp_input.name in data_inputs: value = data_inputs[exp_input.name] - exp_input.value = repr(value) - elif exp_input.type == 3 and exp_input.name in file_inputs: - exp_input.value = file_inputs[exp_input.name] - elif exp_input.type == 4 and exp_input.name in file_inputs: - exp_input.value = ','.join(file_inputs[exp_input.name]) + if exp_input.type == 0: + exp_input.value = str(value) + else: + exp_input.value = repr(value) + elif exp_input.type == 3 and exp_input.name in file_refs: + exp_input.value = file_refs[exp_input.name] + elif exp_input.type == 4 and exp_input.name in file_refs: + exp_input.value = ','.join(file_refs[exp_input.name]) experiment_inputs.append(exp_input) experiment.experimentInputs = experiment_inputs @@ -510,31 +658,81 @@ class AiravataOperator: outputs = self.api_server_client.get_application_outputs(self.airavata_token, app_interface_id) experiment.experimentOutputs = outputs + # upload file inputs for experiment + print(f"[AV] Uploading {len(files_to_upload)} file inputs for experiment...") + self.upload_files(None, None, storage.hostName, files_to_upload, exp_dir) + # create experiment ex_id = self.api_server_client.create_experiment(self.airavata_token, gateway_id, experiment) - - # TODO agent_id generate and send as input parameter - # connect to connection service after this point, and route all file-related requests through it - # later build a ssh adapter for ls type tasks + ex_id = str(ex_id) + print(f"[AV] Experiment {experiment_name} CREATED with id: {ex_id}") # launch experiment self.api_server_client.launch_experiment(self.airavata_token, ex_id, gateway_id) + print(f"[AV] Experiment {experiment_name} STARTED with id: {ex_id}") + + # get process id + print(f"[AV] Experiment {experiment_name} WAITING until experiment begins...") + process_id = None + while process_id is None: + try: + process_id = self.get_process_id(ex_id) + except: + time.sleep(2) + else: + time.sleep(2) + print(f"[AV] Experiment {experiment_name} EXECUTING with pid: {process_id}") return LaunchState( - experiment_id=str(ex_id), + experiment_id=ex_id, + agent_ref=agent_ref, + process_id=process_id, mount_point=mount_point, experiment_dir=exp_dir, - sr_host=str(storage.hostName), + sr_host=storage.hostName, ) - - def get_experiment_status(self, experiment_id) -> Literal["CREATED", "VALIDATED", "SCHEDULED", "LAUNCHED", "EXECUTING", "CANCELING", "CANCELED", "COMPLETED", "FAILED"]: + def get_experiment_status(self, experiment_id: str) -> Literal["CREATED", "VALIDATED", "SCHEDULED", "LAUNCHED", "EXECUTING", "CANCELING", "CANCELED", "COMPLETED", "FAILED"]: states = ["CREATED", "VALIDATED", "SCHEDULED", "LAUNCHED", "EXECUTING", "CANCELING", "CANCELED", "COMPLETED", "FAILED"] status: any = self.api_server_client.get_experiment_status(self.airavata_token, experiment_id) # type: ignore return states[status.state] - - def stop_experiment(self, experiment_id): + def stop_experiment(self, experiment_id: str): status = self.api_server_client.terminate_experiment( self.airavata_token, experiment_id, self.default_gateway_id()) return status + + def execute_py(self, libraries: list[str], code: str, agent_ref: str) -> str | None: + print(f"[av] Executing Python Code...") + try: + res = requests.post(f"{self.connection_svc_url()}/agent/executepythonrequest", json={ + "libraries": libraries, + "code": code, + "pythonVersion": "3.10", # TODO verify + "keepAlive": False, # TODO verify + "parentExperimentId": "/data", # the working directory + "agentId": agent_ref, + }) + data = res.json() + if data["error"] is not None: + raise Exception(data["error"]) + else: + exc_id = data["executionId"] + while True: + res = requests.get(f"{self.connection_svc_url()}/agent/executepythonresponse/{exc_id}") + data = res.json() + if data["available"]: + response = str(data["responseString"]) + return response + time.sleep(1) + except Exception as e: + print("[av] Remote execution failed! {e}") + return None + + def get_available_runtimes(self): + from .runtime import Remote + return [ + Remote(cluster="login.expanse.sdsc.edu", category="gpu", queue_name="gpu-shared", node_count=1, cpu_count=10, walltime=30), + Remote(cluster="login.expanse.sdsc.edu", category="cpu", queue_name="shared", node_count=1, cpu_count=10, walltime=30), + Remote(cluster="anvil.rcac.purdue.edu", category="cpu", queue_name="shared", node_count=1, cpu_count=24, walltime=30), + ] diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/base.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/base.py index 8fe1865d1b..1967cf6f73 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/base.py +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/base.py @@ -65,7 +65,7 @@ class Experiment(Generic[T], abc.ABC): name: str application: T inputs: dict[str, Any] - input_mapping: dict[str, str] + input_mapping: dict[str, tuple[Any, str]] resource: Runtime = Runtime.default() tasks: list[Task] = [] @@ -115,7 +115,7 @@ class Experiment(Generic[T], abc.ABC): task_specific_params = dict(zip(space.keys(), values)) agg_inputs = {**self.inputs, **task_specific_params} - task_inputs = {k: agg_inputs[v] for k, v in self.input_mapping.items()} + task_inputs = {k: {"value": agg_inputs[v[0]], "type": v[1]} for k, v in self.input_mapping.items()} self.tasks.append(Task( name=f"{self.name}_{uuid_str}", @@ -130,6 +130,6 @@ class Experiment(Generic[T], abc.ABC): tasks = [] for t in self.tasks: agg_inputs = {**self.inputs, **t.inputs} - task_inputs = {k: agg_inputs[v] for k, v in self.input_mapping.items()} + task_inputs = {k: {"value": agg_inputs[v[0]], "type": v[1]} for k, v in self.input_mapping.items()} tasks.append(Task(name=t.name, app_id=self.application.app_id, inputs=task_inputs, runtime=t.runtime)) return Plan(tasks=tasks) diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/md/applications.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/md/applications.py index e5e30c92b2..b68b1f7acb 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/md/applications.py +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/md/applications.py @@ -58,19 +58,19 @@ class NAMD(ExperimentApp): num_replicas=num_replicas, ) obj.input_mapping = { - "MD-Instructions-Input": "config_file", # uri? [REQUIRED] - "Coordinates-PDB-File": "pdb_file", # uri? [OPTIONAL] - "Protein-Structure-File_PSF": "psf_file", # uri? [REQUIRED] - "FF-Parameter-Files": "ffp_files", # uri[]? [REQUIRED] - "Execution_Type": "parallelism", # "CPU" | "GPU" [REQUIRED] - "Optional_Inputs": "other_files", # uri[]? [OPTIONAL] - "Number of Replicas": "num_replicas", # integer [REQUIRED] - # "Constraints-PDB": "pdb_file", # uri? [OPTIONAL] - # "Replicate": None, # "yes"? [OPTIONAL] - # "Continue_from_Previous_Run?": None, # "yes"? [OPTIONAL] - # "Previous_JobID": None, # string? [OPTIONAL] [show if "Continue_from_Previous_Run?" == "yes"] - # "GPU Resource Warning": None, # string? [OPTIONAL] [show if "Continue_from_Previous_Run?" == "yes"] - # "Restart_Replicas_List": None, # string [OPTIONAL] [show if "Continue_from_Previous_Run?" == "yes"] + "MD-Instructions-Input": ("config_file", "uri"), # uri? [REQUIRED] + "Coordinates-PDB-File": ("pdb_file", "uri"), # uri? [OPTIONAL] + "Protein-Structure-File_PSF": ("psf_file", "uri"), # uri? [REQUIRED] + "FF-Parameter-Files": ("ffp_files", "uri[]"), # uri[]? [REQUIRED] + "Execution_Type": ("parallelism", "str"), # "CPU" | "GPU" [REQUIRED] + "Optional_Inputs": ("other_files", "uri[]"), # uri[]? [OPTIONAL] + "Number of Replicas": ("num_replicas", "str"), # integer [REQUIRED] + # "Constraints-PDB": ("pdb_file", "uri"), # uri? [OPTIONAL] + # "Replicate": (None, "str"), # "yes"? [OPTIONAL] + # "Continue_from_Previous_Run?": (None, "str"), # "yes"? [OPTIONAL] + # "Previous_JobID": (None, "str"), # string? [OPTIONAL] [show if "Continue_from_Previous_Run?" == "yes"] + # "GPU Resource Warning": (None, "str"), # string? [OPTIONAL] [show if "Continue_from_Previous_Run?" == "yes"] + # "Restart_Replicas_List": (None, "str[]"), # string [OPTIONAL] [show if "Continue_from_Previous_Run?" == "yes"] } obj.tasks = [] return obj diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/plan.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/plan.py index 78991de4e8..b6cdaa497c 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/plan.py +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/plan.py @@ -76,16 +76,7 @@ class Plan(pydantic.BaseModel): print("Fetching results...") fps = list[list[str]]() for task in self.tasks: - runtime = task.runtime - ref = task.ref - task_dir = os.path.join(local_dir, task.name) - os.makedirs(task_dir, exist_ok=True) - fps_task = list[str]() - assert ref is not None - for remote_fp in task.ls(): - fp = runtime.download(remote_fp, task_dir, task) - fps_task.append(fp) - fps.append(fps_task) + fps.append(task.download_all(local_dir)) print("Results fetched.") self.save_json(os.path.join(local_dir, "plan.json")) return fps @@ -101,26 +92,26 @@ class Plan(pydantic.BaseModel): def status(self) -> None: statuses = self.__stage_status__() + print(f"Plan {self.id} ({len(self.tasks)} tasks):") for task, status in zip(self.tasks, statuses): - print(f"{task.name}: {status}") + print(f"* {task.name}: {status}") - def join(self, check_every_n_mins: float = 0.1) -> None: + def wait_for_completion(self, check_every_n_mins: float = 0.1) -> None: n = len(self.tasks) try: with Progress() as progress: - pbars = [progress.add_task(f"{task.name} ({i+1}/{n})", total=None) for i, task in enumerate(self.tasks)] - completed = [False] * n - while not all(completed): + pbars = [progress.add_task(f"{task.name} ({i+1}/{n}): CHECKING", total=None) for i, task in enumerate(self.tasks)] + while True: + completed = [False] * n statuses = self.__stage_status__() - for i, (task, status) in enumerate(zip(self.tasks, statuses)): - pbar = pbars[i] - progress.update(pbar, description=f"{task.name} ({i+1}/{n}): {status}") - if is_terminal_state(status): - completed[i] = True - progress.update(pbar, completed=True) + for i, (task, status, pbar) in enumerate(zip(self.tasks, statuses, pbars)): + completed[i] = is_terminal_state(status) + progress.update(pbar, description=f"{task.name} ({i+1}/{n}): {status}", completed=completed[i], refresh=True) + if all(completed): + break sleep_time = check_every_n_mins * 60 time.sleep(sleep_time) - print("Task(s) complete.") + print("All tasks completed.") except KeyboardInterrupt: print("Interrupted by user.") diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/runtime.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/runtime.py index 16f5c41dc1..0633e4d76b 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/runtime.py +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/runtime.py @@ -20,19 +20,10 @@ from typing import Any from pathlib import Path import pydantic -import requests -import uuid -import time # from .task import Task Task = Any -def is_terminal_state(x): return x in ["CANCELED", "COMPLETED", "FAILED"] - - -conn_svc_url = "api.gateway.cybershuttle.org" - - class Runtime(abc.ABC, pydantic.BaseModel): id: str @@ -145,53 +136,42 @@ class Remote(Runtime): def execute(self, task: Task) -> None: assert task.ref is None assert task.agent_ref is None + assert {"cluster", "queue_name", "node_count", "cpu_count", "walltime"}.issubset(self.args.keys()) + print(f"[Remote] Creating Experiment: name={task.name}") from .airavata import AiravataOperator av = AiravataOperator(context.access_token) - print(f"[Remote] Experiment Created: name={task.name}") - assert "cluster" in self.args - task.agent_ref = str(uuid.uuid4()) - launch_state = av.launch_experiment( - experiment_name=task.name, - app_name=task.app_id, - inputs={**task.inputs, "agent_id": task.agent_ref, "server_url": conn_svc_url}, - computation_resource_name=str(self.args["cluster"]), - queue_name=str(self.args["queue_name"]), - node_count=int(self.args["node_count"]), - cpu_count=int(self.args["cpu_count"]), - walltime=int(self.args["walltime"]), - ) - task.ref = launch_state.experiment_id - task.workdir = launch_state.experiment_dir - task.sr_host = launch_state.sr_host - print(f"[Remote] Experiment Launched: id={task.ref}") + try: + launch_state = av.launch_experiment( + experiment_name=task.name, + app_name=task.app_id, + inputs=task.inputs, + computation_resource_name=str(self.args["cluster"]), + queue_name=str(self.args["queue_name"]), + node_count=int(self.args["node_count"]), + cpu_count=int(self.args["cpu_count"]), + walltime=int(self.args["walltime"]), + ) + task.agent_ref = launch_state.agent_ref + task.pid = launch_state.process_id + task.ref = launch_state.experiment_id + task.workdir = launch_state.experiment_dir + task.sr_host = launch_state.sr_host + print(f"[Remote] Experiment Launched: id={task.ref}") + except Exception as e: + print(f"[Remote] Failed to launch experiment: {e}") + raise e def execute_py(self, libraries: list[str], code: str, task: Task) -> None: + assert task.ref is not None + assert task.agent_ref is not None print(f"* Packages: {libraries}") print(f"* Code:\n{code}") - try: - res = requests.post(f"https://{conn_svc_url}/api/v1/agent/executepythonrequest", json={ - "libraries": libraries, - "code": code, - "pythonVersion": "3.10", # TODO verify - "keepAlive": False, # TODO verify - "parentExperimentId": "/data", # the working directory - "agentId": task.agent_ref, - }) - data = res.json() - if data["error"] is not None: - raise Exception(data["error"]) - else: - exc_id = data["executionId"] - while True: - res = requests.get(f"https://{conn_svc_url}/api/v1/agent/executepythonresponse/{exc_id}") - data = res.json() - if data["available"]: - response = data["responseString"] - return print(response) - time.sleep(1) - except Exception as e: - print(f"\nRemote execution failed! {e}") + + from .airavata import AiravataOperator + av = AiravataOperator(context.access_token) + result = av.execute_py(libraries, code, task.agent_ref) + print(result) def status(self, task: Task): assert task.ref is not None @@ -205,149 +185,72 @@ class Remote(Runtime): def signal(self, signal: str, task: Task) -> None: assert task.ref is not None assert task.agent_ref is not None - + from .airavata import AiravataOperator av = AiravataOperator(context.access_token) av.stop_experiment(task.ref) def ls(self, task: Task) -> list[str]: assert task.ref is not None + assert task.pid is not None assert task.agent_ref is not None assert task.sr_host is not None assert task.workdir is not None from .airavata import AiravataOperator av = AiravataOperator(context.access_token) - - res = requests.post(f"https://{conn_svc_url}/api/v1/agent/executecommandrequest", json={ - "agentId": task.agent_ref, - "workingDir": ".", - "arguments": ["ls", "/data"] - }) - data = res.json() - if data["error"] is not None: - if str(data["error"]) == "Agent not found": - return av.list_files(task.sr_host, task.workdir) - else: - raise Exception(data["error"]) - else: - exc_id = data["executionId"] - while True: - res = requests.get(f"https://{conn_svc_url}/api/v1/agent/executecommandresponse/{exc_id}") - data = res.json() - if data["available"]: - files = data["responseString"].split("\n") - return files - time.sleep(1) + files = av.list_files(task.pid, task.agent_ref, task.sr_host, task.workdir) + return files def upload(self, file: Path, task: Task) -> str: assert task.ref is not None + assert task.pid is not None assert task.agent_ref is not None assert task.sr_host is not None assert task.workdir is not None - import os from .airavata import AiravataOperator av = AiravataOperator(context.access_token) - - res = requests.post(f"https://{conn_svc_url}/api/v1/agent/executecommandrequest", json={ - "agentId": task.agent_ref, - "workingDir": ".", - "arguments": ["cat", os.path.join("/data", file)] - }) - data = res.json() - if data["error"] is not None: - if str(data["error"]) == "Agent not found": - return av.upload_files(task.sr_host, [file], task.workdir).pop() - else: - raise Exception(data["error"]) - else: - exc_id = data["executionId"] - while True: - res = requests.get(f"https://{conn_svc_url}/api/v1/agent/executecommandresponse/{exc_id}") - data = res.json() - if data["available"]: - files = data["responseString"] - return files - time.sleep(1) + result = av.upload_files(task.pid, task.agent_ref, task.sr_host, [file], task.workdir).pop() + return result def download(self, file: str, local_dir: str, task: Task) -> str: assert task.ref is not None + assert task.pid is not None assert task.agent_ref is not None assert task.sr_host is not None assert task.workdir is not None - import os from .airavata import AiravataOperator av = AiravataOperator(context.access_token) - - res = requests.post(f"https://{conn_svc_url}/api/v1/agent/executecommandrequest", json={ - "agentId": task.agent_ref, - "workingDir": ".", - "arguments": ["cat", os.path.join("/data", file)] - }) - data = res.json() - if data["error"] is not None: - if str(data["error"]) == "Agent not found": - return av.download_file(task.sr_host, os.path.join(task.workdir, file), local_dir) - else: - raise Exception(data["error"]) - else: - exc_id = data["executionId"] - while True: - res = requests.get(f"https://{conn_svc_url}/api/v1/agent/executecommandresponse/{exc_id}") - data = res.json() - if data["available"]: - content = data["responseString"] - path = Path(local_dir) / Path(file).name - with open(path, "w") as f: - f.write(content) - return path.as_posix() - time.sleep(1) + result = av.download_file(task.pid, task.agent_ref, task.sr_host, file, task.workdir, local_dir) + return result def cat(self, file: str, task: Task) -> bytes: assert task.ref is not None + assert task.pid is not None assert task.agent_ref is not None assert task.sr_host is not None assert task.workdir is not None - import os from .airavata import AiravataOperator av = AiravataOperator(context.access_token) - - res = requests.post(f"https://{conn_svc_url}/api/v1/agent/executecommandrequest", json={ - "agentId": task.agent_ref, - "workingDir": ".", - "arguments": ["cat", os.path.join("/data", file)] - }) - data = res.json() - if data["error"] is not None: - if str(data["error"]) == "Agent not found": - return av.cat_file(task.sr_host, os.path.join(task.workdir, file)) - else: - raise Exception(data["error"]) - else: - exc_id = data["executionId"] - while True: - res = requests.get(f"https://{conn_svc_url}/api/v1/agent/executecommandresponse/{exc_id}") - data = res.json() - if data["available"]: - content = str(data["responseString"]).encode() - return content - time.sleep(1) + content = av.cat_file(task.pid, task.agent_ref, task.sr_host, file, task.workdir) + return content @staticmethod def default(): - return Remote(cluster="login.expanse.sdsc.edu", category="gpu", queue_name="gpu-shared", node_count=1, cpu_count=24, walltime=30) - + return list_runtimes(cluster="login.expanse.sdsc.edu", category="gpu").pop() + def list_runtimes( cluster: str | None = None, category: str | None = None, ) -> list[Runtime]: - all_runtimes = list[Runtime]([ - Remote(cluster="login.expanse.sdsc.edu", category="gpu", queue_name="gpu-shared", node_count=1, cpu_count=10, walltime=30), - Remote(cluster="login.expanse.sdsc.edu", category="cpu", queue_name="shared", node_count=1, cpu_count=10, walltime=30), - Remote(cluster="anvil.rcac.purdue.edu", category="cpu", queue_name="shared", node_count=1, cpu_count=24, walltime=30), - ]) - return [*filter(lambda r: (cluster in [None, r.args["cluster"]]) and (category in [None, r.args["category"]]), all_runtimes)] \ No newline at end of file + from .airavata import AiravataOperator + av = AiravataOperator(context.access_token) + all_runtimes = av.get_available_runtimes() + return [*filter(lambda r: (cluster in [None, r.args["cluster"]]) and (category in [None, r.args["category"]]), all_runtimes)] + +def is_terminal_state(x): + return x in ["CANCELED", "COMPLETED", "FAILED"] \ No newline at end of file diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/task.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/task.py index a42ebea4e9..1612f44f74 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/task.py +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/task.py @@ -17,6 +17,7 @@ from __future__ import annotations from typing import Any import pydantic from .runtime import Runtime +from rich.progress import Progress class Task(pydantic.BaseModel): @@ -25,6 +26,7 @@ class Task(pydantic.BaseModel): inputs: dict[str, Any] runtime: Runtime ref: str | None = pydantic.Field(default=None) + pid: str | None = pydantic.Field(default=None) agent_ref: str | None = pydantic.Field(default=None) workdir: str | None = pydantic.Field(default=None) sr_host: str | None = pydantic.Field(default=None) @@ -70,6 +72,21 @@ class Task(pydantic.BaseModel): Path(local_dir).mkdir(parents=True, exist_ok=True) return self.runtime.download(file, local_dir, self) + def download_all(self, local_dir: str) -> list[str]: + assert self.ref is not None + import os + os.makedirs(local_dir, exist_ok=True) + fps_task = list[str]() + files = self.ls() + with Progress() as progress: + pbar = progress.add_task(f"Downloading: ...", total=len(files)) + for remote_fp in self.ls(): + fp = self.runtime.download(remote_fp, local_dir, self) + progress.update(pbar, description=f"Downloading: {remote_fp}", advance=1) + fps_task.append(fp) + progress.update(pbar, description=f"Downloading: DONE", refresh=True) + return fps_task + def cat(self, file: str) -> bytes: assert self.ref is not None return self.runtime.cat(file, self) diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/pyproject.toml b/airavata-api/airavata-client-sdks/airavata-python-sdk/pyproject.toml index 14b3f9653f..c4b9f9cf42 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/pyproject.toml +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "airavata-python-sdk-test" -version = "0.0.6.post3" +version = "0.0.10.post1" description = "Apache Airavata Python SDK" readme = "README.md" license = { text = "Apache License 2.0" } @@ -14,7 +14,7 @@ dependencies = [ "oauthlib", "requests", "requests-oauthlib", - "thrift~=0.21.0", + "thrift", "thrift_connector", "paramiko", "scp", diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/samples/poc.ipynb b/airavata-api/airavata-client-sdks/airavata-python-sdk/samples/poc.ipynb deleted file mode 100644 index 8b1fc79325..0000000000 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/samples/poc.ipynb +++ /dev/null @@ -1,384 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Cybershuttle SDK - Molecular Dynamics\n", - "> Define, run, monitor, and analyze molecular dynamics experiments in a HPC-agnostic way.\n", - "\n", - "This notebook shows how users can setup and launch a **NAMD** experiment with replicas, monitor its execution, and run analyses both during and after execution." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Installing Required Packages\n", - "\n", - "First, install the `airavata-python-sdk-test` package from the pip repository." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "%pip install -e airavata-api/airavata-client-sdks/airavata-python-sdk" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Importing the SDK" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "%cd airavata-api/airavata-client-sdks/airavata-python-sdk/samples\n", - "import airavata_experiments as ae\n", - "from airavata_experiments import md" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Authenticating\n", - "\n", - "To authenticate for remote execution, call the `ae.login()` method.\n", - "This method will prompt you to enter your credentials and authenticate your session." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "ae.login()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Once authenticated, the `ae.list_runtimes()` function can be called to list HPC resources that the user has access to." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "runtimes = ae.list_runtimes()\n", - "ae.display(runtimes)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Uploading Experiment Files\n", - "\n", - "Drag and drop experiment files onto the workspace that this notebook is run on.\n", - "\n", - "```bash\n", - "(sh) $: tree data\n", - "data\n", - "├── b4pull.pdb\n", - "├── b4pull.restart.coor\n", - "├── b4pull.restart.vel\n", - "├── b4pull.restart.xsc\n", - "├── par_all36_water.prm\n", - "├── par_all36m_prot.prm\n", - "├── pull.conf\n", - "├── structure.pdb\n", - "└── structure.psf\n", - "\n", - "1 directory, 9 files\n", - "\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Defining a NAMD Experiment\n", - "\n", - "The `md.NAMD.initialize()` is used to define a NAMD experiment.\n", - "Here, provide the paths to the `.conf` file, the `.pdb` file, the `.psf` file, any optional files you want to run NAMD on.\n", - "You can preview the function definition through auto-completion.\n", - "\n", - "```python\n", - "def initialize(\n", - " name: str,\n", - " config_file: str,\n", - " pdb_file: str,\n", - " psf_file: str,\n", - " ffp_files: list[str],\n", - " other_files: list[str] = [],\n", - " parallelism: Literal['CPU', 'GPU'] = \"CPU\",\n", - " num_replicas: int = 1\n", - ") -> Experiment[ExperimentApp]\n", - "```\n", - "\n", - "To add replica runs, simply call the `exp.add_replica()` function.\n", - "You can call the `add_replica()` function as many times as you want replicas.\n", - "Any optional resource constraint can be provided here.\n", - "\n", - "You can also call `ae.display()` to pretty-print the experiment." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "exp = md.NAMD.initialize(\n", - " name=\"yasith_namd_experiment\",\n", - " config_file=\"data/pull.conf\",\n", - " pdb_file=\"data/structure.pdb\",\n", - " psf_file=\"data/structure.psf\",\n", - " ffp_files=[\n", - " \"data/par_all36_water.prm\",\n", - " \"data/par_all36m_prot.prm\"\n", - " ],\n", - " other_files=[\n", - " \"data/b4pull.pdb\",\n", - " \"data/b4pull.restart.coor\",\n", - " \"data/b4pull.restart.vel\",\n", - " \"data/b4pull.restart.xsc\",\n", - " ],\n", - " parallelism=\"GPU\",\n", - ")\n", - "exp.add_replica(*ae.list_runtimes(cluster=\"login.expanse.sdsc.edu\", category=\"gpu\"))\n", - "ae.display(exp)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Creating an Execution Plan\n", - "\n", - "Call the `exp.plan()` function to transform the experiment definition + replicas into a stateful execution plan." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "plan = exp.plan()\n", - "ae.display(plan)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Saving the Plan\n", - "\n", - "A created plan can be saved locally (in JSON) or remotely (in a user-local DB) for later reference." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "plan.save() # this will save the plan in DB\n", - "plan.save_json(\"plan.json\") # save the plan state locally" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Launching the Plan\n", - "\n", - "A created plan can be launched using the `plan.launch()` function.\n", - "Changes to plan states will be automatically saved onto the remote.\n", - "However, plan state can also be tracked locally by invoking `plan.save_json()`." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "plan.launch()\n", - "plan.save_json(\"plan.json\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Loading a Saved Plan\n", - "\n", - "A saved plan can be loaded by calling `ae.plan.load_json(plan_path)` (for local plans) or `ae.plan.load(plan_id)` (for remote plans)." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "plan = ae.plan.load_json(\"plan.json\")\n", - "plan = ae.plan.load(plan.id)\n", - "ae.display(plan)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Fetching User-Defined Plans\n", - "\n", - "The `ae.plan.query()` function retrieves all plans stored in the remote." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "plans = ae.plan.query()\n", - "ae.display(plans)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Checking Plan Status\n", - "\n", - "The plan's execution status can be checked by calling `plan.status()`." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "plan.status()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Managing Plan Execution\n", - "\n", - "The `plan.stop()` function will stop a currently executing plan.\n", - "The `plan.join()` function would block code execution until the plan completes its execution." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "plan.stop()\n", - "plan.join()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Running File Operations" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Displaying the status and files generated by each replica (task)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "for task in plan.tasks:\n", - " status = task.status()\n", - " print(status)\n", - " # task.upload(\"data/sample.txt\")\n", - " files = task.ls()\n", - " display(files)\n", - " display(task.cat(\"NAMD.stderr\"))\n", - " # task.download(\"NAMD.stdout\", \"./results\")\n", - " task.download(\"NAMD_Repl_1.out\", \"./results\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Displaying the intermediate results generated by each replica (task)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "for index, task in enumerate(plan.tasks):\n", - "\n", - " @task.context(packages=[\"matplotlib\", \"pandas\"])\n", - " def analyze(x, y, index, num_tasks) -> None:\n", - " from matplotlib import pyplot as plt\n", - " import pandas as pd\n", - " df = pd.read_csv(\"data.csv\")\n", - " plt.figure(figsize=(x, y))\n", - " plt.plot(df[\"x\"], df[\"y\"], marker=\"o\", linestyle=\"-\", linewidth=2, markersize=6)\n", - " plt.title(f\"Plot for Replica {index} of {num_tasks}\")\n", - "\n", - " analyze(3, 4, index+1, len(plan.tasks))" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "airavata", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.12.7" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/dev-tools/deployment/scripts/.gitkeep b/dev-tools/deployment/scripts/.gitkeep new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dev-tools/deployment/scripts/expanse/alphafold2-agent.sh b/dev-tools/deployment/scripts/expanse/alphafold2-agent.sh new file mode 100755 index 0000000000..d752b6baa8 --- /dev/null +++ b/dev-tools/deployment/scripts/expanse/alphafold2-agent.sh @@ -0,0 +1,133 @@ +#!/bin/bash -x + +# ##################################################################### +# AlphaFold2 Driver + Airavata Agent for Expanse +# ##################################################################### +# +# ---------------------------------------------------------------------- +# CONTRIBUTORS +# ---------------------------------------------------------------------- +# * Sudhakar Pamidigantham +# * Lahiru Jayathilake +# * Dimuthu Wannipurage +# * Yasith Jayawardana +# +# ###################################################################### + +######################################################################## +# Part 1 - Housekeeping +######################################################################## + +#----------------------------------------------------------------------- +# Step 1.1 - Check command line +#----------------------------------------------------------------------- + +while getopts t:p:m: option; do + case $option in + t) MaxDate=$OPTARG ;; + p) MODEL_PRESET=$OPTARG ;; + m) Num_Multi=$OPTARG ;; + \?) cat <<ENDCAT ;; +>! Usage: $0 [-t Maximum Template Date ] !< +>! [-p Model Preset ] !< +>! [-m Number of Multimers per Model ] !< +ENDCAT + # exit 1 ;; + esac +done + +if [ $Num_Multi = "" ]; then + export Num_Multi=1 +fi +#set the environment PATH +export PYTHONNOUSERSITE=True +module reset +module load singularitypro +ALPHAFOLD_DATA_PATH=/expanse/projects/qstore/data/alphafold-v2.3.2 +ALPHAFOLD_MODELS=/expanse/projects/qstore/data/alphafold-v2.3.2/params + +#ALPHAFOLD_DATA_PATH=/expanse/projects/qstore/data/alphafold +#ALPHAFOLD_MODELS=/expanse/projects/qstore/data/alphafold/params +pdb70="" +uniprot="" +pdbseqres="" +nummulti="" + +# check_flags +if [ "monomer" = "${MODEL_PRESET%_*}" ]; then + export pdb70="--pdb70_database_path=/data/pdb70/pdb70" +else + export uniprot="--uniprot_database_path=/data/uniprot/uniprot.fasta" + export pdbseqres="--pdb_seqres_database_path=/data/pdb_seqres/pdb_seqres.txt" + export nummulti="--num_multimer_predictions_per_model=$Num_Multi" +fi + +## Copy input to node local scratch +cp input.fasta /scratch/$USER/job_$SLURM_JOBID +#cp -r /expanse/projects/qstore/data/alphafold/uniclust30/uniclust30_2018_08 /scratch/$USER/job_$SLURM_JOBID/ +cd /scratch/$USER/job_$SLURM_JOBID +ln -s /expanse/projects/qstore/data/alphafold/uniclust30/uniclust30_2018_08 +mkdir bfd +cp /expanse/projects/qstore/data/alphafold/bfd/*index bfd/ +#cp /expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_hhm.ffdata bfd/ +#cp /expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_cs219.ffdata bfd/ +cd bfd +ln -s /expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_hhm.ffdata +ln -s /expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_cs219.ffdata +ln -s /expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_a3m.ffdata +cd ../ +mkdir alphafold_output +# Create soft links ro rundir form submitdir + +ln -s /scratch/$USER/job_$SLURM_JOBID $SLURM_SUBMIT_DIR/rundir + +#Run the command +singularity run --nv \ + -B /expanse/lustre \ + -B /expanse/projects \ + -B /scratch \ + -B $ALPHAFOLD_DATA_PATH:/data \ + -B $ALPHAFOLD_MODELS \ + /cm/shared/apps/containers/singularity/alphafold/alphafold_aria2_v2.3.2.simg \ + --fasta_paths=/scratch/$USER/job_$SLURM_JOBID/input.fasta \ + --uniref90_database_path=/data/uniref90/uniref90.fasta \ + --data_dir=/data \ + --mgnify_database_path=/data/mgnify/mgy_clusters_2022_05.fa \ + --bfd_database_path=/scratch/$USER/job_$SLURM_JOBID/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt \ + --uniref30_database_path=/data/uniref30/UniRef30_2021_03 \ + $pdbseqres \ + $pdb70 \ + $uniprot \ + --template_mmcif_dir=/data/pdb_mmcif/mmcif_files \ + --obsolete_pdbs_path=/data/pdb_mmcif/obsolete.dat \ + --output_dir=/scratch/$USER/job_$SLURM_JOBID/alphafold_output \ + --max_template_date=$MaxDate \ + --model_preset=$MODEL_PRESET \ + --use_gpu_relax=true \ + --models_to_relax=best \ + $nummulti + +#-B .:/etc \ +#/cm/shared/apps/containers/singularity/alphafold/alphafold.sif \ +#--fasta_paths=input.fasta \ +#--uniref90_database_path=/data/uniref90/uniref90.fasta \ +#--data_dir=/data \ +#--mgnify_database_path=/data/mgnify/mgy_clusters.fa \ +#--bfd_database_path=/scratch/$USER/job_$SLURM_JOBID/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt \ +#--uniclust30_database_path=/scratch/$USER/job_$SLURM_JOBID/uniclust30_2018_08/uniclust30_2018_08 \ +#--pdb70_database_path=/data/pdb70/pdb70 \ +#--template_mmcif_dir=/data/pdb_mmcif/mmcif_files \ +#--obsolete_pdbs_path=/data/pdb_mmcif/obsolete.dat \ +#--output_dir=alphafold_output \ +#--max_template_date=$MaxDate \ +#--preset=$MODEL_PRESET + +#make a user choice --preset=casp14 +#make this a user choice --max_template_date=2020-05-14 \ +# --model_names='model_1' \ +# Remove model data +unlink $SLURM_SUBMIT_DIR/rundir + +### Copy back results + +tar -cvf $SLURM_SUBMIT_DIR/alphafold_output.tar alphafold_output diff --git a/dev-tools/deployment/scripts/expanse/namd-agent.sh b/dev-tools/deployment/scripts/expanse/namd-agent.sh new file mode 100755 index 0000000000..b2a7301822 --- /dev/null +++ b/dev-tools/deployment/scripts/expanse/namd-agent.sh @@ -0,0 +1,232 @@ +#!/bin/bash -x + +# ##################################################################### +# NAMD Driver + Airavata Agent for Expanse +# ##################################################################### +# +# ---------------------------------------------------------------------- +# CONTRIBUTORS +# ---------------------------------------------------------------------- +# * Sudhakar Pamidigantham +# * Diego Gomes +# * Lahiru Jayathilake +# * Yasith Jayawardana +# +# ---------------------------------------------------------------------- +# CHANGELOG +# ---------------------------------------------------------------------- +# * 2024/12/13 - Agent subprocess and graceful shutdown (Yasith J) +# * 2024/12/09 - Reviewed (Diego Gomes) +# ###################################################################### + +######################################################################## +# Part 1 - Housekeeping +######################################################################## + +#----------------------------------------------------------------------- +# Step 1.1 - Check command line +#----------------------------------------------------------------------- +if [ $# -lt 1 -o $# -gt 15 ]; then + echo 1>&2 "Usage: $0 -t [CPU/GPU] -r [PJobID] -l [Continue_Replicas_list] -n [Number_of_Replicas] -i input_conf [SEAGrid_UserName] " + exit 127 +fi + +# subdir depends on whether we're doing freq, water or PES. For freq and water, +# it should be hardcoded in the Xbaya workflow. For PES, it should be an +# additional array generated by the frontend. The contents of this array are +# trivial, but creating an extra Xbaya service to generate it would add +# unnecessary extra complexity. Besides, the frontend cannot avoid having to +# pass at least one array: the array with gjf files. + +subdir="$PWD" +while getopts t:r:l:n:i:a:s: option; do + case $option in + t) ExeTyp=$OPTARG ;; + r) PJobID=$OPTARG ;; + l) rep_list=$OPTARG ;; + n) num_rep=$OPTARG ;; + i) input=$OPTARG ;; + a) agent_id=$OPTARG ;; + s) server_url=$OPTARG ;; + \?) cat <<ENDCAT ;; +>! Usage: $0 [-et execution type cpu/gpu ] !< +>! [-rr Previous JobID for continuation (optional)] !< +>! [-rl replica list for contiuation (optional)] !< +>! [-rep Number of replicas to run (optional)] !< +ENDCAT + esac +done + +echo "ExeTyp=$ExeTyp" +echo "PJobID=$PJobID" +echo "rep_list=$rep_list" +echo "num_rep=$num_rep" +echo "input=$input" +echo "agent_id=$agent_id" +echo "server_url=$server_url" + +# ---------------------------------------------------------------------- +# RUN AGENT AS SUBPROCESS (for now) +# ---------------------------------------------------------------------- +SIF_PATH=/home/scigap/agent-framework/airavata-agent.sif +module load singularitypro +singularity exec --bind $PWD:/data $SIF_PATH bash -c "/opt/airavata-agent $server_url:19900 $agent_id" & +agent_pid=$! # save agent PID for graceful shutdown + +#----------------------------------------------------------------------- +# Step 1.2 - Validate inputs +#----------------------------------------------------------------------- + +if [ ! $AIRAVATA_USERNAME ]; then + echo "Missing AIRAVATA_USERNAME. Check with support!" + exit +fi + +if [ ! $ExeTyp ]; then + echo "Missing Execution Type: [CPU, GPU]" + exit +fi + +SG_UserName="$AIRAVATA_USERNAME" +echo "Execution Type: $ExeTyp" + +#----------------------------------------------------------------------- +# Step 1.3 - Get the input configuration filename +#----------------------------------------------------------------------- +filename=$(basename -- "$input") +filename="${filename%.*}" + +#----------------------------------------------------------------------- +# Step 1.4 - Copy previous files if this a continuation. +#----------------------------------------------------------------------- +if [ "$PJobID" ]; then + cp $input saveInput #save configuration + ls -lt $localarc/$PJobID/ + cp -r $localarc/$PJobID/. . + cp saveInput $input +fi + +#----------------------------------------------------------------------- +# Step 1.5 - Create folders for replicas (if necessary) +#----------------------------------------------------------------------- +echo " Creating folders for replica run(s)" +input_files=$(ls *.* | grep -v slurm) + +# Create one subdirectory per replica and copy over the inputs +for i in $(seq 1 ${num_rep}); do + if [ ! -d ${i} ]; then + mkdir ${i} + cp $input_files ${i}/ + fi +done + +######################################################################## +# Part 2 - Machine specific Options (SDSC-Expanse) +######################################################################## + +#----------------------------------------------------------------------- +# Step 2.1 - Load modules (SDSC-Expanse) +#----------------------------------------------------------------------- + +module purge +module load slurm/expanse/current + +if [ $ExeTyp = "CPU" ]; then + echo "Loading CPU modules" + module load cpu/0.17.3b gcc/10.2.0 openmpi/4.1.1 +fi +if [ $ExeTyp = "GPU" ]; then + echo "Loading GPU modules" + module load gpu/0.17.3b +fi + +module list + +#----------------------------------------------------------------------- +# Step 2.2 - Set NAMD binary and command line for SDSC-Expanse +#----------------------------------------------------------------------- +APP_PATH=/home/scigap/applications +if [ $ExeTyp == "CPU" ]; then + export NAMDPATH="$APP_PATH/NAMD_3.1alpha2_Linux-x86_64-multicore" +fi +if [ $ExeTyp == "GPU" ]; then + export NAMDPATH="$APP_PATH/NAMD_3.1alpha2_Linux-x86_64-multicore-CUDA" +fi + +#----------------------------------------------------------------------- +# Step 2.3 A - Run NAMD3 (CPU, Serial) +#----------------------------------------------------------------------- +# - one replica at a given time +# - each replica uses all CPUs +#----------------------------------------------------------------------- +if [ ${ExeTyp} == "CPU" ]; then + for replica in $(seq 1 ${num_rep}); do + cd ${subdir}/${replica}/ # Go to folder + + # Run NAMD3 + ${NAMDPATH}/namd3 \ + +setcpuaffinity \ + +p ${SLURM_CPUS_ON_NODE} \ + $input >${filename}.out 2>${filename}.err + done +fi + +#----------------------------------------------------------------------- +# Step 2.3 B - Run NAMD3 (GPU, Batched) +#----------------------------------------------------------------------- +# - one replica PER GPU at a given time +# - each replica uses all CPUs +#----------------------------------------------------------------------- +if [ ${ExeTyp} == "GPU" ]; then + GPU_ID=0 + subtask_pids=() + for replica in $(seq 1 ${num_rep}); do + cd ${subdir}/${replica}/ # Go to folder + + # Run NAMD3 in background + ${NAMDPATH}/namd3 \ + +setcpuaffinity \ + +p ${SLURM_CPUS_ON_NODE} \ + +devices ${GPU_ID} \ + $input >${filename}.out 2>${filename}.err & + + subtask_pids+=($!) # Store PID of the background NAMD task + let GPU_ID+=1 # Increment GPU_ID + + # Wait for a batch of replicas to complete + if [ ${GPU_ID} == ${SLURM_GPUS_ON_NODE} ]; then + wait "${subtask_pids[@]}" # wait for current batch to complete + subtask_pids=() # clear subtask_pids of current batch + GPU_ID=0 # reset gpu counter + fi + done + + # Wait for the last batch of replicas to complete + if [ ${#subtask_pids[@]} -gt 0 ]; then + wait "${subtask_pids[@]}" # wait for last batch to complete + subtask_pids=() # clear subtask_pids of last batch + fi +fi + +# Once done, go back to main folder +cd ${subdir} + +######################################################################## +# Part 3 - Output Flattening +######################################################################## +num_rep=3 +for replica in $(seq 1 ${num_rep}); do + for file in $(ls ${replica}/*.*); do + mv ${file} ${replica}"_"$(basename $file) + done + rm -rf ${replica}/ +done + +# Send SIGTERM to agent, and wait for completion +kill -TERM $agent_pid +wait $agent_pid + +# Give it a break when jobs are done +sleep 10 + +# bye! diff --git a/modules/agent-framework/deployments/jupyterhub/data/1_experiment_sdk.ipynb b/modules/agent-framework/deployments/jupyterhub/data/1_experiment_sdk.ipynb index 1e5dba8f50..f89b022ecd 100644 --- a/modules/agent-framework/deployments/jupyterhub/data/1_experiment_sdk.ipynb +++ b/modules/agent-framework/deployments/jupyterhub/data/1_experiment_sdk.ipynb @@ -176,9 +176,10 @@ " \"data/b4pull.restart.vel\",\n", " \"data/b4pull.restart.xsc\",\n", " ],\n", - " parallelism=\"GPU\",\n", + " parallelism=\"CPU\",\n", + " num_replicas=1,\n", ")\n", - "exp.add_replica()\n", + "exp.add_replica(*ae.list_runtimes(cluster=\"login.expanse.sdsc.edu\", category=\"cpu\"))\n", "ae.display(exp)" ] }, @@ -271,6 +272,7 @@ "source": [ "plan = ae.plan.load_json(\"plan.json\")\n", "plan = ae.plan.load(plan.id)\n", + "plan.status()\n", "ae.display(plan)" ] }, @@ -294,24 +296,6 @@ "ae.display(plans)" ] }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Checking Plan Status\n", - "\n", - "The plan's execution status can be checked by calling `plan.status()`." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "plan.status()" - ] - }, { "cell_type": "markdown", "metadata": {}, @@ -319,7 +303,7 @@ "## Managing Plan Execution\n", "\n", "The `plan.stop()` function will stop a currently executing plan.\n", - "The `plan.join()` function would block code execution until the plan completes its execution." + "The `plan.wait_for_completion()` function would block until the plan finishes executing." ] }, { @@ -328,8 +312,8 @@ "metadata": {}, "outputs": [], "source": [ - "plan.stop()\n", - "plan.join()" + "# plan.stop()\n", + "plan.wait_for_completion()" ] }, { @@ -361,14 +345,17 @@ "outputs": [], "source": [ "for task in plan.tasks:\n", - " status = task.status()\n", - " print(status)\n", - " # task.upload(\"data/sample.txt\")\n", - " files = task.ls()\n", - " display(files)\n", - " display(task.cat(\"NAMD.stderr\"))\n", - " # task.download(\"NAMD.stdout\", \"./results\")\n", - " task.download(\"NAMD_Repl_1.out\", \"./results\")" + " print(task.name, task.pid)\n", + " # display files\n", + " display(task.ls())\n", + " # upload a file\n", + " task.upload(\"data/sample.txt\")\n", + " # preview contents of a file\n", + " display(task.cat(\"sample.txt\"))\n", + " # download a specific file\n", + " task.download(\"sample.txt\", f\"./results_{task.name}\")\n", + " # download all files\n", + " task.download_all(f\"./results_{task.name}\")" ] }, { @@ -390,17 +377,14 @@ "outputs": [], "source": [ "for index, task in enumerate(plan.tasks):\n", - "\n", - " @task.context(packages=[\"matplotlib\", \"pandas\"])\n", - " def analyze(x, y, index, num_tasks) -> None:\n", - " from matplotlib import pyplot as plt\n", - " import pandas as pd\n", - " df = pd.read_csv(\"data.csv\")\n", - " plt.figure(figsize=(x, y))\n", - " plt.plot(df[\"x\"], df[\"y\"], marker=\"o\", linestyle=\"-\", linewidth=2, markersize=6)\n", - " plt.title(f\"Plot for Replica {index} of {num_tasks}\")\n", - "\n", - " analyze(3, 4, index+1, len(plan.tasks))" + " @task.context(packages=[\"numpy\", \"pandas\"])\n", + " def analyze() -> None:\n", + " import numpy as np\n", + " with open(\"pull.conf\", \"r\") as f:\n", + " data = f.read()\n", + " print(\"pull.conf has\", len(data), \"chars\")\n", + " print(np.arange(10))\n", + " analyze()" ] } ],
