This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 3560c00c2d474fa08ca157fa24dccbd9263f5003 Author: yasith <[email protected]> AuthorDate: Mon Nov 25 06:40:08 2024 -0600 initial experiment sdk implementation --- .../airavata-python-sdk/.gitignore | 8 + .../airavata_experiments/__init__.py | 39 ++ .../airavata_experiments/airavata.py | 346 +++++++++++++++ .../airavata_experiments/auth/__init__.py | 31 ++ .../airavata_experiments/auth/device_auth.py | 134 ++++++ .../airavata_experiments/base.py | 135 ++++++ .../airavata_experiments/md/__init__.py | 19 + .../airavata_experiments/md/applications.py | 101 +++++ .../airavata_experiments/plan.py | 136 ++++++ .../airavata_experiments/runtime.py | 215 +++++++++ .../airavata_experiments/sftp.py | 126 ++++++ .../airavata_experiments/task.py | 62 +++ .../clients/sftp_file_handling_client.py | 43 +- .../clients/utils/experiment_handler_util.py | 11 +- .../airavata_sdk/transport/settings.py | 1 + .../airavata-python-sdk/pyproject.toml | 12 +- .../airavata-python-sdk/requirements.txt | 11 - .../airavata-python-sdk/samples/annotations.py | 70 +++ .../airavata-python-sdk/samples/poc.ipynb | 486 +++++++++++++++++++++ .../airavata-python-sdk/setup.cfg | 26 -- .../airavata-python-sdk/setup.py | 22 - 21 files changed, 1947 insertions(+), 87 deletions(-) diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/.gitignore b/airavata-api/airavata-client-sdks/airavata-python-sdk/.gitignore index 37e0880890..11b22924b5 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/.gitignore +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/.gitignore @@ -4,3 +4,11 @@ airavata_python_sdk.egg-info .tox dist build +__pycache__/ +.DS_Store +.ipynb_checkpoints +*.egg-info/ +data/ +plan.json +settings*.ini +auth.state \ No newline at end of file diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/__init__.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/__init__.py new file mode 100644 index 0000000000..298a46ad94 --- /dev/null +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/__init__.py @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import annotations + +from . import base, md, plan +from .runtime import list_runtimes +from .auth import login, logout + + +def load_plan(path: str) -> plan.Plan: + return plan.Plan.load_json(path) + + +def task_context(task: base.Task): + def inner(func): + # take the function into the task's location + # and execute it there. then fetch the result + result = func(**task.inputs) + # and return it to the caller. + return result + + return inner + + +__all__ = ["login", "logout", "list_runtimes", "md", "task_context"] diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py new file mode 100644 index 0000000000..1dda44c55a --- /dev/null +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py @@ -0,0 +1,346 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +from pathlib import Path +from .sftp import SFTPConnector + +import jwt +from airavata.model.security.ttypes import AuthzToken +from airavata_sdk.clients.api_server_client import APIServerClient +from airavata_sdk.clients.utils.api_server_client_util import APIServerClientUtil +from airavata_sdk.clients.utils.data_model_creation_util import DataModelCreationUtil +from airavata_sdk.transport.settings import ExperimentSettings, GatewaySettings + +logger = logging.getLogger("airavata_sdk.clients") +logger.setLevel(logging.INFO) + + +class AiravataOperator: + + def __init__(self, access_token: str, config_file: str = "settings.ini"): + # store variables + self.config_file = config_file + self.access_token = access_token + # load api server settings and create client + self.api_server_client = APIServerClient(self.config_file) + # load gateway settings + self.gateway_conf = GatewaySettings(self.config_file) + gateway_id = self.gateway_conf.GATEWAY_ID + # load experiment settings + self.experiment_conf = ExperimentSettings(self.config_file) + self.airavata_token = self.__airavata_token__(self.access_token, gateway_id) + self.api_util = APIServerClientUtil(self.config_file, username=self.user_id, password="", gateway_id=gateway_id, access_token=self.access_token) + + def __airavata_token__(self, access_token, gateway_id): + """ + Decode access token (string) and create AuthzToken (object) + + """ + decode = jwt.decode(access_token, options={"verify_signature": False}) + self.user_id = str(decode["preferred_username"]) + claimsMap = {"userName": self.user_id, "gatewayID": gateway_id} + return AuthzToken(accessToken=self.access_token, claimsMap=claimsMap) + + def get_experiment(self, experiment_id: str): + """ + Get experiment by id + + """ + return self.api_server_client.get_experiment(self.airavata_token, experiment_id) + + def get_accessible_apps(self, gateway_id: str | None = None): + """ + Get all applications available in the gateway + + """ + # use defaults for missing values + gateway_id = gateway_id or self.gateway_conf.GATEWAY_ID + # logic + app_interfaces = self.api_server_client.get_all_application_interfaces(self.airavata_token, gateway_id) + return app_interfaces + + def get_preferred_storage(self, gateway_id: str | None = None, storage_name: str | None = None): + """ + Get preferred storage resource + + """ + # use defaults for missing values + gateway_id = gateway_id or self.gateway_conf.GATEWAY_ID + storage_name = storage_name or self.experiment_conf.STORAGE_RESOURCE_HOST + # logic + storage_id = self.api_util.get_storage_resource_id(storage_name) + return self.api_server_client.get_gateway_storage_preference(self.airavata_token, gateway_id, storage_id) + + def get_storage(self, storage_name: str | None = None) -> any: # type: ignore + """ + Get storage resource by name + + """ + # use defaults for missing values + storage_name = storage_name or self.experiment_conf.STORAGE_RESOURCE_HOST + # logic + storage_id = self.api_util.get_storage_resource_id(storage_name) + storage = self.api_util.api_server_client.get_storage_resource(self.airavata_token, storage_id) + return storage + + def get_group_resource_profile(self, grp_name: str | None = None): + """ + Get group resource profile by name + + """ + # use defaults for missing values + grp_name = grp_name or self.experiment_conf.GROUP_RESOURCE_PROFILE_NAME + # logic + grp_id = self.api_util.get_group_resource_profile_id(grp_name) + grp = self.api_util.api_server_client.get_group_resource_profile(self.airavata_token, grp_id) + return grp + + def get_compatible_deployments(self, app_interface_id: str, grp_name: str | None = None): + """ + Get compatible deployments for an application interface and group resource profile + + """ + # use defaults for missing values + grp_name = grp_name or self.experiment_conf.GROUP_RESOURCE_PROFILE_NAME + # logic + grp_id = self.api_util.get_group_resource_profile_id(grp_name) + deployments = self.api_server_client.get_application_deployments_for_app_module_and_group_resource_profile(self.airavata_token, app_interface_id, grp_id) + return deployments + + def get_app_interface_id(self, app_name: str, gateway_id: str | None = None): + """ + Get application interface id by name + + """ + self.api_util.gateway_id = str(gateway_id or self.gateway_conf.GATEWAY_ID) + return self.api_util.get_execution_id(app_name) + + def get_application_inputs(self, app_interface_id: str) -> list: + """ + Get application inputs by id + + """ + return list(self.api_server_client.get_application_inputs(self.airavata_token, app_interface_id)) # type: ignore + + def get_compute_resources_by_ids(self, resource_ids: list[str]): + """ + Get compute resources by ids + + """ + return [self.api_server_client.get_compute_resource(self.airavata_token, resource_id) for resource_id in resource_ids] + + def make_experiment_dir(self, storage_resource, project_name: str, experiment_name: str) -> str: + """ + Make experiment directory on storage resource, and return the remote path + + Return Path: /{project_name}/{experiment_name} + + """ + host = storage_resource.hostName + port = self.experiment_conf.SFTP_PORT + sftp_connector = SFTPConnector(host=host, port=port, username=self.user_id, password=self.access_token) + remote_path = sftp_connector.make_experiment_dir(project_name, experiment_name) + logger.info("Experiment directory created at %s", remote_path) + return remote_path + + def upload_files(self, storage_resource, files: list[Path], exp_dir: str) -> None: + """ + Upload input files to storage resource, and return the remote path + + Return Path: /{project_name}/{experiment_name} + + """ + host = storage_resource.hostName + port = self.experiment_conf.SFTP_PORT + sftp_connector = SFTPConnector(host=host, port=port, username=self.user_id, password=self.access_token) + sftp_connector.upload_files(files, exp_dir) + logger.info("Input files uploaded to %s", exp_dir) + + def launch_experiment( + self, + experiment_name: str, + app_name: str, + computation_resource_name: str, + inputs: dict[str, str | int | float | list[str]], + *, + gateway_id: str | None = None, + queue_name: str | None = None, + grp_name: str | None = None, + sr_host: str | None = None, + project_name: str | None = None, + node_count: int | None = None, + cpu_count: int | None = None, + walltime: int | None = None, + auto_schedule: bool = False, + ) -> str: + """ + Launch an experiment and return its id + + """ + # preprocess args (str) + print("[AV] Preprocessing args...") + gateway_id = str(gateway_id or self.gateway_conf.GATEWAY_ID) + queue_name = str(queue_name or self.experiment_conf.QUEUE_NAME) + grp_name = str(grp_name or self.experiment_conf.GROUP_RESOURCE_PROFILE_NAME) + sr_host = str(sr_host or self.experiment_conf.STORAGE_RESOURCE_HOST) + project_name = str(project_name or self.experiment_conf.PROJECT_NAME) + mount_point = Path(self.gateway_conf.GATEWAY_DATA_STORE_DIR) / self.user_id + + # preprocess args (int) + node_count = int(node_count or self.experiment_conf.NODE_COUNT or "1") + cpu_count = int(cpu_count or self.experiment_conf.TOTAL_CPU_COUNT or "1") + walltime = int(walltime or self.experiment_conf.WALL_TIME_LIMIT or "30") + + # validate args (str) + print("[AV] Validating args...") + assert len(experiment_name) > 0 + assert len(app_name) > 0 + assert len(computation_resource_name) > 0 + assert len(inputs) > 0 + assert len(gateway_id) > 0 + assert len(queue_name) > 0 + assert len(grp_name) > 0 + assert len(sr_host) > 0 + assert len(project_name) > 0 + assert len(mount_point.as_posix()) > 0 + + # validate args (int) + assert node_count > 0 + assert cpu_count > 0 + assert walltime > 0 + + # setup runtime params + print("[AV] Setting up runtime params...") + storage = self.get_storage(sr_host) + queue_name = queue_name or self.experiment_conf.QUEUE_NAME + node_count = int(node_count or self.experiment_conf.NODE_COUNT or "1") + cpu_count = int(cpu_count or self.experiment_conf.TOTAL_CPU_COUNT or "1") + walltime = int(walltime or self.experiment_conf.WALL_TIME_LIMIT or "01:00:00") + sr_id = storage.storageResourceId + + # setup application interface + print("[AV] Setting up application interface...") + app_interface_id = self.get_app_interface_id(app_name) + assert app_interface_id is not None + + # setup experiment + print("[AV] Setting up experiment...") + data_model_util = DataModelCreationUtil( + self.config_file, + username=self.user_id, + password=None, + gateway_id=gateway_id, + access_token=self.access_token, + ) + experiment = data_model_util.get_experiment_data_model_for_single_application( + experiment_name=experiment_name, + application_name=app_name, + project_name=project_name, + description=experiment_name, + ) + + # setup experiment directory + print("[AV] Setting up experiment directory...") + exp_dir = self.make_experiment_dir( + storage_resource=storage, + project_name=project_name, + experiment_name=experiment_name, + ) + abs_path = (mount_point / exp_dir.lstrip("/")).as_posix().rstrip("/") + "/" + print("[AV] exp_dir:", exp_dir) + print("[AV] abs_path:", abs_path) + + experiment = data_model_util.configure_computation_resource_scheduling( + experiment_model=experiment, + computation_resource_name=computation_resource_name, + group_resource_profile_name=grp_name, + storageId=sr_id, + node_count=node_count, + total_cpu_count=cpu_count, + wall_time_limit=walltime, + queue_name=queue_name, + experiment_dir_path=abs_path, + auto_schedule=auto_schedule, + ) + + # set up file inputs + print("[AV] Setting up file inputs...") + + def register_input_file(file: Path) -> str: + return str(data_model_util.register_input_file(file.name, sr_host, sr_id, file.name, abs_path)) + + # setup experiment inputs + files_to_upload = list[Path]() + file_inputs = dict[str, str | list[str]]() + data_inputs = dict[str, str | list[str] | int | float]() + for key, value in inputs.items(): + + if isinstance(value, str) and Path(value).is_file(): + file = Path(value) + files_to_upload.append(file) + file_inputs[key] = register_input_file(file) + + elif isinstance(value, list) and all([isinstance(v, str) and Path(v).is_file() for v in value]): + files = [*map(Path, value)] + files_to_upload.extend(files) + file_inputs[key] = [*map(register_input_file, files)] + + else: + data_inputs[key] = value + + # configure file inputs for experiment + print("[AV] Uploading file inputs for experiment...") + self.upload_files(storage, files_to_upload, exp_dir) + + # configure experiment inputs + experiment_inputs = [] + for exp_input in self.api_server_client.get_application_inputs(self.airavata_token, app_interface_id): # type: ignore + if exp_input.type < 3 and exp_input.name in data_inputs: + value = data_inputs[exp_input.name] + exp_input.value = repr(value) + elif exp_input.type == 3 and exp_input.name in file_inputs: + exp_input.value = file_inputs[exp_input.name] + elif exp_input.type == 4 and exp_input.name in file_inputs: + exp_input.value = ','.join(file_inputs[exp_input.name]) + experiment_inputs.append(exp_input) + experiment.experimentInputs = experiment_inputs + + # configure experiment outputs + outputs = self.api_server_client.get_application_outputs(self.airavata_token, app_interface_id) + experiment.experimentOutputs = outputs + + # create experiment + ex_id = self.api_server_client.create_experiment(self.airavata_token, gateway_id, experiment) + + # TODO agent_id generate and send as input parameter + # connect to connection service after this point, and route all file-related requests through it + # later build a ssh adapter for ls type tasks + + # launch experiment + self.api_server_client.launch_experiment(self.airavata_token, ex_id, gateway_id) + + return str(ex_id) + + def get_experiment_status(self, experiment_id): + status = self.api_server_client.get_experiment_status( + self.airavata_token, experiment_id) + return status + + def stop_experiment(self, experiment_id): + status = self.api_server_client.terminate_experiment( + self.airavata_token, experiment_id, self.gateway_conf.GATEWAY_ID) + return status diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/auth/__init__.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/auth/__init__.py new file mode 100644 index 0000000000..86bf71e7f5 --- /dev/null +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/auth/__init__.py @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from .device_auth import DeviceFlowAuthenticator + +context = DeviceFlowAuthenticator( + idp_url="https://auth.cybershuttle.org", + realm="10000000", + client_id="cybershuttle-agent", +) + + +def login(): + context.login() + + +def logout(): + context.logout() diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/auth/device_auth.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/auth/device_auth.py new file mode 100644 index 0000000000..11aada5ffa --- /dev/null +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/auth/device_auth.py @@ -0,0 +1,134 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import time + +import requests + + +class DeviceFlowAuthenticator: + + idp_url: str + realm: str + client_id: str + device_code: str | None + interval: int + access_token: str | None + refresh_token: str | None + + @property + def logged_in(self) -> bool: + return self.access_token is not None + + def __init__( + self, + idp_url: str, + realm: str, + client_id: str, + ): + self.idp_url = idp_url + self.realm = realm + self.client_id = client_id + + if not self.client_id or not self.realm or not self.idp_url: + raise ValueError( + "Missing required environment variables for client ID, realm, or auth server URL") + + self.device_code = None + self.interval = -1 + self.access_token = None + + def login(self, interactive: bool = True): + # Step 0: Check if we have a saved token + if self.__load_saved_token__(): + print("Using saved token") + return + + # Step 1: Request device and user code + auth_device_url = f"{self.idp_url}/realms/{self.realm}/protocol/openid-connect/auth/device" + response = requests.post(auth_device_url, data={ + "client_id": self.client_id, "scope": "openid"}) + + if response.status_code != 200: + print(f"Error in device authorization request: {response.status_code} - {response.text}") + return + + data = response.json() + self.device_code = data.get("device_code") + self.interval = data.get("interval", 5) + + print(f"User code: {data.get('user_code')}") + print(f"Please authenticate by visiting: {data.get('verification_uri_complete')}") + + if interactive: + import webbrowser + + webbrowser.open(data.get("verification_uri_complete")) + + # Step 2: Poll for the token + self.__poll_for_token__() + + def logout(self): + self.access_token = None + self.refresh_token = None + + def __poll_for_token__(self): + token_url = f"{self.idp_url}/realms/{self.realm}/protocol/openid-connect/token" + print("Waiting for authorization...") + while True: + response = requests.post( + token_url, + data={ + "client_id": self.client_id, + "grant_type": "urn:ietf:params:oauth:grant-type:device_code", + "device_code": self.device_code, + }, + ) + if response.status_code == 200: + data = response.json() + self.refresh_token = data.get("refresh_token") + self.access_token = data.get("access_token") + print("Authorization successful!") + self.__persist_token__() + return + elif response.status_code == 400 and response.json().get("error") == "authorization_pending": + time.sleep(self.interval) + else: + print(f"Authorization error: {response.status_code} - {response.text}") + break + + def __persist_token__(self): + import json + with open("auth.state", "w") as f: + json.dump({"refresh_token": self.refresh_token, + "access_token": self.access_token}, f) + + def __load_saved_token__(self): + import json + import jwt + import datetime + try: + with open("auth.state", "r") as f: + data = json.load(f) + self.refresh_token = str(data["refresh_token"]) + self.access_token = str(data["access_token"]) + decoded = jwt.decode(self.access_token, options={"verify_signature": False}) + tA = datetime.datetime.now(datetime.timezone.utc).timestamp() + tB = int(decoded.get("exp", 0)) + return tA < tB + except (FileNotFoundError, KeyError, ValueError, StopIteration) as e: + print(e) + return False diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/base.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/base.py new file mode 100644 index 0000000000..8fe1865d1b --- /dev/null +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/base.py @@ -0,0 +1,135 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import annotations + +import abc +from itertools import product +from typing import Any, Generic, TypeVar +import uuid +import random + +from .plan import Plan +from .runtime import Runtime +from .task import Task + + +class GUIApp: + + app_id: str + + def __init__(self, app_id: str) -> None: + self.app_id = app_id + + def open(self, runtime: Runtime, location: str) -> None: + """ + Open the GUI application + """ + raise NotImplementedError() + + @classmethod + @abc.abstractmethod + def initialize(cls, **kwargs) -> GUIApp: ... + + +class ExperimentApp: + + app_id: str + + def __init__(self, app_id: str) -> None: + self.app_id = app_id + + @classmethod + @abc.abstractmethod + def initialize(cls, **kwargs) -> Experiment: ... + + +T = TypeVar("T", ExperimentApp, GUIApp) + + +class Experiment(Generic[T], abc.ABC): + + name: str + application: T + inputs: dict[str, Any] + input_mapping: dict[str, str] + resource: Runtime = Runtime.default() + tasks: list[Task] = [] + + def __init__(self, name: str, application: T): + self.name = name + self.application = application + self.input_mapping = {} + + def with_inputs(self, **inputs: Any) -> Experiment[T]: + """ + Add shared inputs to the experiment + """ + self.inputs = inputs + return self + + def with_resource(self, resource: Runtime) -> Experiment[T]: + self.resource = resource + return self + + def add_replica(self, *allowed_runtimes: Runtime) -> None: + """ + Add a replica to the experiment. + This will create a copy of the application with the given inputs. + + """ + runtime = random.choice(allowed_runtimes) if len(allowed_runtimes) > 0 else self.resource + uuid_str = str(uuid.uuid4())[:4].upper() + + self.tasks.append( + Task( + name=f"{self.name}_{uuid_str}", + app_id=self.application.app_id, + inputs={**self.inputs}, + runtime=runtime, + ) + ) + print(f"Added replica. ({len(self.tasks)} tasks in total)") + + def add_sweep(self, *allowed_runtimes: Runtime, **space: list) -> None: + """ + Add a sweep to the experiment. + + """ + for values in product(space.values()): + runtime = random.choice(allowed_runtimes) if len(allowed_runtimes) > 0 else self.resource + uuid_str = str(uuid.uuid4())[:4].upper() + + task_specific_params = dict(zip(space.keys(), values)) + agg_inputs = {**self.inputs, **task_specific_params} + task_inputs = {k: agg_inputs[v] for k, v in self.input_mapping.items()} + + self.tasks.append(Task( + name=f"{self.name}_{uuid_str}", + app_id=self.application.app_id, + inputs=task_inputs, + runtime=runtime or self.resource, + )) + + def plan(self, **kwargs) -> Plan: + if len(self.tasks) == 0: + self.add_replica(self.resource) + tasks = [] + for t in self.tasks: + agg_inputs = {**self.inputs, **t.inputs} + task_inputs = {k: agg_inputs[v] for k, v in self.input_mapping.items()} + tasks.append(Task(name=t.name, app_id=self.application.app_id, inputs=task_inputs, runtime=t.runtime)) + return Plan(tasks=tasks) diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/md/__init__.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/md/__init__.py new file mode 100644 index 0000000000..5c3b0d790c --- /dev/null +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/md/__init__.py @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from .applications import NAMD, VMD + +__all__ = ["NAMD", "VMD"] diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/md/applications.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/md/applications.py new file mode 100644 index 0000000000..e5e30c92b2 --- /dev/null +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/md/applications.py @@ -0,0 +1,101 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Literal +from ..base import Experiment, ExperimentApp, GUIApp + + +class NAMD(ExperimentApp): + """ + Nanoscale Molecular Dynamics (NAMD, formerly Not Another Molecular Dynamics Program) + is a computer software for molecular dynamics simulation, written using the Charm++ + parallel programming model (not to be confused with CHARMM). + It is noted for its parallel efficiency and is often used to simulate large systems + (millions of atoms). It has been developed by the collaboration of the Theoretical + and Computational Biophysics Group (TCB) and the Parallel Programming Laboratory (PPL) + at the University of Illinois Urbana–Champaign. + + """ + + def __init__( + self, + ) -> None: + super().__init__(app_id="NAMD") + + @classmethod + def initialize( + cls, + name: str, + config_file: str, + pdb_file: str, + psf_file: str, + ffp_files: list[str], + other_files: list[str] = [], + parallelism: Literal["CPU", "GPU"] = "CPU", + num_replicas: int = 1, + ) -> Experiment[ExperimentApp]: + app = cls() + obj = Experiment[ExperimentApp](name, app).with_inputs( + config_file=config_file, + pdb_file=pdb_file, + psf_file=psf_file, + ffp_files=ffp_files, + parallelism=parallelism, + other_files=other_files, + num_replicas=num_replicas, + ) + obj.input_mapping = { + "MD-Instructions-Input": "config_file", # uri? [REQUIRED] + "Coordinates-PDB-File": "pdb_file", # uri? [OPTIONAL] + "Protein-Structure-File_PSF": "psf_file", # uri? [REQUIRED] + "FF-Parameter-Files": "ffp_files", # uri[]? [REQUIRED] + "Execution_Type": "parallelism", # "CPU" | "GPU" [REQUIRED] + "Optional_Inputs": "other_files", # uri[]? [OPTIONAL] + "Number of Replicas": "num_replicas", # integer [REQUIRED] + # "Constraints-PDB": "pdb_file", # uri? [OPTIONAL] + # "Replicate": None, # "yes"? [OPTIONAL] + # "Continue_from_Previous_Run?": None, # "yes"? [OPTIONAL] + # "Previous_JobID": None, # string? [OPTIONAL] [show if "Continue_from_Previous_Run?" == "yes"] + # "GPU Resource Warning": None, # string? [OPTIONAL] [show if "Continue_from_Previous_Run?" == "yes"] + # "Restart_Replicas_List": None, # string [OPTIONAL] [show if "Continue_from_Previous_Run?" == "yes"] + } + obj.tasks = [] + return obj + + +class VMD(GUIApp): + """ + Visual Molecular Dynamics (VMD) is a molecular visualization and analysis program + designed for biological systems such as proteins, nucleic acids, lipid bilayer assemblies, + etc. It also includes tools for working with volumetric data, sequence data, and arbitrary + graphics objects. VMD can be used to animate and analyze the trajectory of molecular dynamics + simulations, and can interactively manipulate molecules being simulated on remote computers + (Interactive MD). + + """ + + def __init__( + self, + ) -> None: + super().__init__(app_id="vmd") + + @classmethod + def initialize( + cls, + name: str, + ) -> GUIApp: + app = cls() + return app diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/plan.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/plan.py new file mode 100644 index 0000000000..4d098215b3 --- /dev/null +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/plan.py @@ -0,0 +1,136 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import annotations + +import json +import time + +import pydantic +from rich.progress import Progress +from .runtime import Runtime +from .task import Task + + +class Plan(pydantic.BaseModel): + + tasks: list[Task] = [] + + @pydantic.field_validator("tasks", mode="before") + def default_tasks(cls, v): + if isinstance(v, list): + return [Task(**task) if isinstance(task, dict) else task for task in v] + return v + + def describe(self) -> None: + for task in self.tasks: + print(task) + + def __stage_prepare__(self) -> None: + print("Preparing execution plan...") + self.describe() + + def __stage_confirm__(self, silent: bool) -> None: + print("Confirming execution plan...") + if not silent: + while True: + res = input("Here is the execution plan. continue? (Y/n) ") + if res.upper() in ["N"]: + raise Exception("Execution was aborted by user.") + elif res.upper() in ["Y", ""]: + break + else: + continue + + def __stage_launch_task__(self) -> None: + print("Launching tasks...") + for task in self.tasks: + task.launch() + + def __stage_status__(self) -> list: + statuses = [] + for task in self.tasks: + statuses.append(task.status()) + return statuses + + def __stage_stop__(self) -> None: + print("Stopping task(s)...") + for task in self.tasks: + task.stop() + print("Task(s) stopped.") + + def __stage_fetch__(self) -> list[list[str]]: + print("Fetching results...") + fps = list[list[str]]() + for task in self.tasks: + runtime = task.runtime + ref = task.ref + fps_task = list[str]() + assert ref is not None + for remote_fp in task.files(): + fp = runtime.download(ref, remote_fp) + fps_task.append(fp) + fps.append(fps_task) + print("Results fetched.") + return fps + + def launch(self, silent: bool = False) -> None: + try: + self.__stage_prepare__() + self.__stage_confirm__(silent) + self.__stage_launch_task__() + except Exception as e: + print(*e.args, sep="\n") + + def join(self, check_every_n_mins: float = 0.1) -> None: + n = len(self.tasks) + states = ["CREATED", "VALIDATED", "SCHEDULED", "LAUNCHED", "EXECUTING", "CANCELING", "CANCELED", "COMPLETED", "FAILED"] + def is_terminal_state(x): return x in ["CANCELED", "COMPLETED", "FAILED"] + try: + with Progress() as progress: + pbars = [progress.add_task(f"{task.name} ({i+1}/{n})", total=None) for i, task in enumerate(self.tasks)] + completed = [False] * n + while not all(completed): + statuses = self.__stage_status__() + for i, (task, status) in enumerate(zip(self.tasks, statuses)): + state = status.state + state_text = states[state] + pbar = pbars[i] + progress.update(pbar, description=f"{task.name} ({i+1}/{n}): {state_text}") + if is_terminal_state(state_text): + completed[i] = True + progress.update(pbar, completed=True) + sleep_time = check_every_n_mins * 60 + time.sleep(sleep_time) + print("Task(s) complete.") + except KeyboardInterrupt: + print("Interrupted by user.") + + def stop(self) -> None: + self.__stage_stop__() + + def save_json(self, filename: str) -> None: + with open(filename, "w") as f: + json.dump(self.model_dump(), f, indent=2) + + @staticmethod + def load_json(filename: str) -> Plan: + with open(filename, "r") as f: + model = json.load(f) + return Plan(**model) + + def collect_results(self, runtime: Runtime) -> list[list[str]]: + return self.__stage_fetch__() diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/runtime.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/runtime.py new file mode 100644 index 0000000000..23347b46e4 --- /dev/null +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/runtime.py @@ -0,0 +1,215 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from .auth import context +import abc +from typing import Any + +import pydantic +import requests +import uuid +import time + +Task = Any + + +conn_svc_url = "api.gateway.cybershuttle.org" + + +class Runtime(abc.ABC, pydantic.BaseModel): + + id: str + args: dict[str, str | int | float] = pydantic.Field(default={}) + + @abc.abstractmethod + def execute(self, task: Task) -> None: ... + + @abc.abstractmethod + def status(self, task: Task) -> str: ... + + @abc.abstractmethod + def signal(self, signal: str, task: Task) -> None: ... + + @abc.abstractmethod + def ls(self, task: Task) -> list[str]: ... + + @abc.abstractmethod + def download(self, file: str, task: Task) -> str: ... + + def __str__(self) -> str: + return f"{self.__class__.__name__}(args={self.args})" + + @staticmethod + def default(): + # return Mock() + return Remote.default() + + @staticmethod + def create(id: str, args: dict[str, Any]) -> "Runtime": + if id == "mock": + return Mock(**args) + elif id == "remote": + return Remote(**args) + else: + raise ValueError(f"Unknown runtime id: {id}") + + @staticmethod + def Remote(**kwargs): + return Remote(**kwargs) + + @staticmethod + def Local(**kwargs): + return Mock(**kwargs) + + +class Mock(Runtime): + + _state: int = 0 + + def __init__(self) -> None: + super().__init__(id="mock") + + def execute(self, task: Task) -> None: + import uuid + task.agent_ref = str(uuid.uuid4()) + task.ref = str(uuid.uuid4()) + + def status(self, task: Task) -> str: + import random + + self._state += random.randint(0, 5) + if self._state > 10: + return "COMPLETED" + return "RUNNING" + + def signal(self, signal: str, task: Task) -> None: + pass + + def ls(self, task: Task) -> list[str]: + return [] + + def download(self, file: str, task: Task) -> str: + return "" + + @staticmethod + def default(): + return Mock() + + +class Remote(Runtime): + + def __init__(self, **kwargs) -> None: + super().__init__(id="remote", args=kwargs) + + def execute(self, task: Task) -> None: + assert context.access_token is not None + assert task.ref is None + assert task.agent_ref is None + + from .airavata import AiravataOperator + av = AiravataOperator(context.access_token) + print(f"[Remote] Experiment Created: name={task.name}") + assert "cluster" in self.args + task.agent_ref = str(uuid.uuid4()) + task.ref = av.launch_experiment( + experiment_name=task.name, + app_name=task.app_id, + computation_resource_name=str(self.args["cluster"]), + inputs={**task.inputs, "agent_id": task.agent_ref, "server_url": conn_svc_url} + ) + print(f"[Remote] Experiment Launched: id={task.ref}") + + def status(self, task: Task): + assert context.access_token is not None + assert task.ref is not None + assert task.agent_ref is not None + + from .airavata import AiravataOperator + av = AiravataOperator(context.access_token) + status = av.get_experiment_status(task.ref) + return status + + def signal(self, signal: str, task: Task) -> None: + assert context.access_token is not None + assert task.ref is not None + assert task.agent_ref is not None + + from .airavata import AiravataOperator + av = AiravataOperator(context.access_token) + status = av.stop_experiment(task.ref) + + def ls(self, task: Task) -> list[str]: + assert context.access_token is not None + assert task.ref is not None + assert task.agent_ref is not None + + res = requests.post(f"https://{conn_svc_url}/api/v1/agent/executecommandrequest", json={ + "agentId": task.agent_ref, + "workingDir": ".", + "arguments": ["ls", "/data"] + }) + data = res.json() + if data["error"] is not None: + if str(data["error"]) == "Agent not found": + print("Experiment is initializing...") + return [] + else: + raise Exception(data["error"]) + else: + exc_id = data["executionId"] + while True: + res = requests.get(f"https://{conn_svc_url}/api/v1/agent/executecommandresponse/{exc_id}") + data = res.json() + if data["available"]: + files = data["responseString"].split("\n") + return files + time.sleep(1) + + def download(self, file: str, task: Task) -> str: + assert context.access_token is not None + assert task.ref is not None + assert task.agent_ref is not None + + import os + + res = requests.post(f"https://{conn_svc_url}/api/v1/agent/executecommandrequest", json={ + "agentId": task.agent_ref, + "workingDir": ".", + "arguments": ["cat", os.path.join("/data", file)] + }) + data = res.json() + if data["error"] is not None: + raise Exception(data["error"]) + else: + exc_id = data["executionId"] + while True: + res = requests.get(f"https://{conn_svc_url}/api/v1/agent/executecommandresponse/{exc_id}") + data = res.json() + if data["available"]: + files = data["responseString"] + return files + time.sleep(1) + + @staticmethod + def default(): + return Remote( + cluster="login.expanse.sdsc.edu", + ) + + +def list_runtimes(**kwargs) -> list[Runtime]: + # TODO get list using token + return [Remote(cluster="login.expanse.sdsc.edu"), Remote(cluster="anvil.rcac.purdue.edu")] diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/sftp.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/sftp.py new file mode 100644 index 0000000000..15d66d1790 --- /dev/null +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/sftp.py @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +from pathlib import Path +from datetime import datetime +from rich.progress import Progress + +import paramiko +from paramiko import SFTPClient, Transport +from scp import SCPClient + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +logging.getLogger("paramiko").setLevel(logging.WARNING) + + +def create_pkey(pkey_path): + if pkey_path is not None: + return paramiko.RSAKey.from_private_key_file(pkey_path) + return None + + +class SFTPConnector(object): + + def __init__(self, host, port, username, password=None, pkey=None): + self.host = host + self.port = port + self.username = username + self.password = password + self.pkey = pkey + + ssh = paramiko.SSHClient() + self.ssh = ssh + # self.sftp = paramiko.SFTPClient() + # Trust all key policy on remote host + + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + def upload_files(self, localpaths: list[Path], remote_path: str): + transport = Transport(sock=(self.host, int(self.port))) + if self.pkey is not None: + transport.connect(username=self.username, pkey=create_pkey(self.pkey)) + else: + transport.connect(username=self.username, password=self.password) + try: + with Progress() as progress: + task = progress.add_task("Uploading...", total=len(localpaths)-1) + for file in localpaths: + connection = SFTPClient.from_transport(transport) + assert connection is not None + try: + connection.lstat(remote_path) # Test if remote_path exists + except IOError: + connection.mkdir(remote_path) + remote_fpath = remote_path + "/" + file.name + connection.put(file, remote_fpath) + progress.update(task, advance=1, description=f"Uploading: {file.name}") + progress.update(task, completed=True) + finally: + transport.close() + return remote_path + + def make_experiment_dir(self, project_name: str, exprement_id, remote_base=""): + project_name = project_name.replace(" ", "_") + time = datetime.now().strftime("%Y-%m-%d %H:%M:%S").replace(" ", "_") + time = time.replace(":", "_") + time = time.replace("-", "_") + exprement_id = exprement_id + "_" + time + base_path = remote_base + "/" + project_name + remote_path = base_path + "/" + exprement_id + transport = Transport(sock=(self.host, int(self.port))) + if self.pkey is not None: + transport.connect(username=self.username, + pkey=create_pkey(self.pkey)) + else: + transport.connect(username=self.username, password=self.password) + + try: + connection = SFTPClient.from_transport(transport) + assert connection is not None + try: + connection.lstat(base_path) # Test if remote_path exists + except IOError: + connection.mkdir(base_path) + try: + connection.lstat(remote_path) # Test if remote_path exists + except IOError: + connection.mkdir(remote_path) + finally: + transport.close() + + return remote_path + + def download_files(self, local_path, remote_path): + if self.pkey is not None: + self.ssh.connect(self.host, self.port, self.username, + pkey=create_pkey(self.pkey)) + else: + self.ssh.connect(self.host, self.port, + self.username, password=self.password) + + transport = self.ssh.get_transport() + assert transport is not None + with SCPClient(transport) as conn: + conn.get(remote_path=remote_path, + local_path=local_path, recursive=True) + self.ssh.close() + + @staticmethod + def uploading_info(uploaded_file_size, total_file_size): + logging.info("uploaded_file_size : {} total_file_size : {}".format( + uploaded_file_size, total_file_size)) diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/task.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/task.py new file mode 100644 index 0000000000..d4290b91ee --- /dev/null +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/task.py @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Any + +import pydantic + +from .runtime import Runtime + +class Task(pydantic.BaseModel): + + name: str + app_id: str + inputs: dict[str, Any] + runtime: Runtime + ref: str | None = pydantic.Field(default=None) + agent_ref: str | None = pydantic.Field(default=None) + + @pydantic.field_validator("runtime", mode="before") + def set_runtime(cls, v): + if isinstance(v, dict) and "id" in v: + id = v.pop("id") + args = v.pop("args", {}) + return Runtime.create(id=id, args=args) + return v + + def __str__(self) -> str: + return f"Task(\nname={self.name}\napp_id={self.app_id}\ninputs={self.inputs}\nruntime={self.runtime}\n)" + + def launch(self) -> None: + assert self.ref is None + print(f"[Task] Executing {self.name} on {self.runtime}") + self.runtime.execute(self) + + def status(self) -> str: + assert self.ref is not None + return self.runtime.status(self) + + def files(self) -> list[str]: + assert self.ref is not None + return self.runtime.ls(self) + + def cat(self, file: str) -> str: + assert self.ref is not None + return self.runtime.download(file, self) + + def stop(self) -> None: + assert self.ref is not None + return self.runtime.signal("SIGTERM", self) diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_sdk/clients/sftp_file_handling_client.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_sdk/clients/sftp_file_handling_client.py index badd5f9755..734ec82b47 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_sdk/clients/sftp_file_handling_client.py +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_sdk/clients/sftp_file_handling_client.py @@ -28,13 +28,20 @@ logger.setLevel(logging.INFO) logging.getLogger("paramiko").setLevel(logging.WARNING) +def create_pkey(pkey_path): + if pkey_path is not None: + return paramiko.RSAKey.from_private_key_file(pkey_path) + return None + + class SFTPConnector(object): - def __init__(self, host, port, username, password): + def __init__(self, host, port, username, password = None, pkey = None): self.host = host self.port = port self.username = username self.password = password + self.pkey = pkey ssh = paramiko.SSHClient() self.ssh = ssh @@ -44,38 +51,38 @@ class SFTPConnector(object): ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - def upload_files(self, local_path, project_name, exprement_id): + def upload_files(self, local_path, remote_base, project_name, exprement_id): project_name = project_name.replace(" ", "_") time = datetime.now().strftime('%Y-%m-%d %H:%M:%S').replace(" ", "_") time = time.replace(":", "_") time = time.replace("-", "_") exprement_id = exprement_id+"_"+time - remote_path = "/" + project_name + "/" + exprement_id + "/" - pathsuffix = self.username + remote_path + base_path = remote_base + "/" + project_name + remote_path = base_path + "/" + exprement_id + # pathsuffix = self.username + remote_path files = os.listdir(local_path) - for file in files: - try: - transport = Transport(sock=(self.host, int(self.port))) - transport.connect(username=self.username, password=self.password) + transport = Transport(sock=(self.host, int(self.port))) + transport.connect(username=self.username, password=self.password, pkey=create_pkey(self.pkey)) + try: + for file in files: connection = SFTPClient.from_transport(transport) try: - base_path = "/" + project_name - connection.chdir(base_path) # Test if remote_path exists + connection.lstat(base_path) # Test if remote_path exists except IOError: - connection.mkdir(base_path) try: - connection.chdir(remote_path) # Test if remote_path exists + connection.lstat(remote_path) # Test if remote_path exists except IOError: connection.mkdir(remote_path) - connection.put(os.path.join(local_path, file), remote_path + "/" + file) - finally: - transport.close() - return pathsuffix + remote_fpath = remote_path + "/" + file + print(f"{file} -> {remote_fpath}") + connection.put(os.path.join(local_path, file), remote_fpath) + finally: + transport.close() + return remote_path def download_files(self, local_path, remote_path): - - self.ssh.connect(self.host, self.port, self.username, password = self.password) + self.ssh.connect(self.host, self.port, self.username, password=self.password, pkey=create_pkey(self.pkey)) with SCPClient(self.ssh.get_transport()) as conn: conn.get(remote_path=remote_path, local_path= local_path, recursive= True) self.ssh.close() diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_sdk/clients/utils/experiment_handler_util.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_sdk/clients/utils/experiment_handler_util.py index 9e81f539b7..df3991de67 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_sdk/clients/utils/experiment_handler_util.py +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_sdk/clients/utils/experiment_handler_util.py @@ -38,15 +38,14 @@ logger.setLevel(logging.INFO) class ExperimentHandlerUtil(object): - def __init__(self, configuration_file_location=None): + def __init__(self, configuration_file_location=None, access_token=None): self.configuration_file = configuration_file_location - self.authenticator = Authenticator(configuration_file_location) self.gateway_conf = GatewaySettings(configuration_file_location) self.experiment_conf = ExperimentSettings(configuration_file_location) - self.keycloak_conf = KeycloakConfiguration(configuration_file_location) self.authenticator = Authenticator(self.configuration_file) - self.authenticator.authenticate_with_auth_code() - access_token = getpass.getpass('Copy paste the access token') + if access_token is None: + self.authenticator.authenticate_with_auth_code() + access_token = getpass.getpass('Copy paste the access token') self.access_token = access_token decode = jwt.decode(access_token, options={"verify_signature": False}) self.user_id = decode['preferred_username'] @@ -178,7 +177,7 @@ class ExperimentHandlerUtil(object): logger.info("experiment launched id: %s", ex_id) - experiment_url = 'https://' + self.gateway_conf.GATEWAY_ID + '.org/workspace/experiments/' + ex_id + experiment_url = 'https://' + self.gateway_conf.GATEWAY_URL + '.org/workspace/experiments/' + ex_id logger.info("For more information visit %s", experiment_url) if self.experiment_conf.MONITOR_STATUS: diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_sdk/transport/settings.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_sdk/transport/settings.py index b1e61f22aa..d36cc65550 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_sdk/transport/settings.py +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_sdk/transport/settings.py @@ -113,6 +113,7 @@ class GatewaySettings(object): if configFileLocation is not None: config.read(configFileLocation) self.GATEWAY_ID = config.get('Gateway', 'GATEWAY_ID') + self.GATEWAY_URL = config.get('Gateway', 'GATEWAY_URL') self.GATEWAY_DATA_STORE_RESOURCE_ID = config.get('Gateway', 'GATEWAY_DATA_STORE_RESOURCE_ID') self.GATEWAY_DATA_STORE_DIR = config.get('Gateway', 'GATEWAY_DATA_STORE_DIR') self.GATEWAY_DATA_STORE_HOSTNAME = config.get('Gateway', 'GATEWAY_DATA_STORE_HOSTNAME') diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/pyproject.toml b/airavata-api/airavata-client-sdks/airavata-python-sdk/pyproject.toml index bb2e223234..79ba8eedbc 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/pyproject.toml +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/pyproject.toml @@ -3,13 +3,13 @@ requires = ["setuptools", "wheel"] build-backend = "setuptools.build_meta" [project] -name = "airavata-python-sdk" -version = "1.1.6" +name = "airavata-python-sdk-test" +version = "0.0.3" description = "Apache Airavata Python SDK" readme = "README.md" license = { text = "Apache License 2.0" } authors = [{ name = "Airavata Developers", email = "[email protected]" }] -requires-python = ">=3.6" +requires-python = ">=3.10" dependencies = [ "oauthlib", "requests", @@ -21,12 +21,16 @@ dependencies = [ "pysftp", "configparser", "urllib3", - "pyjwt" + "pyjwt", + "pydantic", + "rich", + "ipywidgets", ] [tool.setuptools.packages.find] where = ["."] include = ["airavata*"] +exclude = ["*.egg-info"] [tool.setuptools.package-data] "airavata_sdk.transport" = ["*.ini"] diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/requirements.txt b/airavata-api/airavata-client-sdks/airavata-python-sdk/requirements.txt deleted file mode 100644 index c7f5deda08..0000000000 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/requirements.txt +++ /dev/null @@ -1,11 +0,0 @@ -oauthlib -requests -requests-oauthlib -thrift==0.16.0 -thrift_connector -paramiko -scp -pysftp -configparser -urllib3 -pyjwt diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/samples/annotations.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/samples/annotations.py new file mode 100644 index 0000000000..2c368b69f3 --- /dev/null +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/samples/annotations.py @@ -0,0 +1,70 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import time +import inspect + +# Define the decorator factory +def analyze_replica(plan_id = 0, replica_id=1): + def decorator(func): + def wrapper(*args, **kwargs): + # Filter loaded functions and classes + loaded_functions = {name: obj for name, obj in globals().items() if inspect.isfunction(obj)} + loaded_classes = {name: obj for name, obj in globals().items() if inspect.isclass(obj)} + + print("Plan id ", plan_id, " Replica id ", replica_id) + print("Passed function") + print(inspect.getsource(func)) + + print("Functions loaded in session:") + for name, f in loaded_functions.items(): + # Skip the wrapper itself and the decorator function + if name in ['execution_timer', 'decorator', 'wrapper']: + continue + print(f"- {name}:") + print(inspect.getsource(f)) + + print("\nClasses loaded in session:") + for name, cls in loaded_classes.items(): + print(f"- {name}:") + print(inspect.getsource(cls)) + + # Find the + # Call the original function + result = func(*args, **kwargs) + return result # Return the result of the original function + return wrapper + return decorator + + +# Example standalone function +def print_something(): + print("PRINTING SOMETHING") + +def print_some_int(integer= 10): + print("PRINTING SOMETHING ", integer) + +# Apply the decorator with a parameter +@analyze_replica(plan_id = 100, replica_id=10110) +def example_function(n): + time.sleep(n) # Simulate a delay + print_something() + return f"Function ran for {n} seconds." + + +# Call the decorated function +print(example_function(2)) \ No newline at end of file diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/samples/poc.ipynb b/airavata-api/airavata-client-sdks/airavata-python-sdk/samples/poc.ipynb new file mode 100644 index 0000000000..eb01e821b6 --- /dev/null +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/samples/poc.ipynb @@ -0,0 +1,486 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Airavata Experiment SDK - Molecular Dynamics Example\n", + "\n", + "This SDK allows users to define, plan, and execute molecular dynamics experiments with ease.\n", + "Here we demonstrate how to authenticate, set up a NAMD experiment, add replicas, create an execution plan, and monitor the execution." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Install the required packages\n", + "\n", + "First, install the `airavata-python-sdk-test` package from the pip repository." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Obtaining file:///Users/yasith/projects/artisan/airavata/airavata-api/airavata-client-sdks/airavata-python-sdk\n", + " Installing build dependencies ... \u001b[?25ldone\n", + "\u001b[?25h Checking if build backend supports build_editable ... \u001b[?25ldone\n", + "\u001b[?25h Getting requirements to build editable ... \u001b[?25ldone\n", + "\u001b[?25h Preparing editable metadata (pyproject.toml) ... \u001b[?25ldone\n", + "\u001b[?25hRequirement already satisfied: oauthlib in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from airavata-python-sdk-test==0.0.2) (3.2.2)\n", + "Requirement already satisfied: requests in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from airavata-python-sdk-test==0.0.2) (2.32.3)\n", + "Requirement already satisfied: requests-oauthlib in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from airavata-python-sdk-test==0.0.2) (2.0.0)\n", + "Requirement already satisfied: thrift~=0.16.0 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from airavata-python-sdk-test==0.0.2) (0.16.0)\n", + "Requirement already satisfied: thrift_connector in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from airavata-python-sdk-test==0.0.2) (0.24)\n", + "Requirement already satisfied: paramiko in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from airavata-python-sdk-test==0.0.2) (3.5.0)\n", + "Requirement already satisfied: scp in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from airavata-python-sdk-test==0.0.2) (0.15.0)\n", + "Requirement already satisfied: pysftp in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from airavata-python-sdk-test==0.0.2) (0.2.9)\n", + "Requirement already satisfied: configparser in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from airavata-python-sdk-test==0.0.2) (7.1.0)\n", + "Requirement already satisfied: urllib3 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from airavata-python-sdk-test==0.0.2) (2.2.3)\n", + "Requirement already satisfied: pyjwt in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from airavata-python-sdk-test==0.0.2) (2.10.1)\n", + "Requirement already satisfied: pydantic in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from airavata-python-sdk-test==0.0.2) (2.10.3)\n", + "Requirement already satisfied: rich in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from airavata-python-sdk-test==0.0.2) (13.9.4)\n", + "Requirement already satisfied: ipywidgets in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from airavata-python-sdk-test==0.0.2) (8.1.5)\n", + "Requirement already satisfied: six>=1.7.2 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from thrift~=0.16.0->airavata-python-sdk-test==0.0.2) (1.16.0)\n", + "Requirement already satisfied: comm>=0.1.3 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from ipywidgets->airavata-python-sdk-test==0.0.2) (0.2.2)\n", + "Requirement already satisfied: ipython>=6.1.0 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from ipywidgets->airavata-python-sdk-test==0.0.2) (8.30.0)\n", + "Requirement already satisfied: traitlets>=4.3.1 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from ipywidgets->airavata-python-sdk-test==0.0.2) (5.14.3)\n", + "Requirement already satisfied: widgetsnbextension~=4.0.12 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from ipywidgets->airavata-python-sdk-test==0.0.2) (4.0.13)\n", + "Requirement already satisfied: jupyterlab-widgets~=3.0.12 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from ipywidgets->airavata-python-sdk-test==0.0.2) (3.0.13)\n", + "Requirement already satisfied: bcrypt>=3.2 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from paramiko->airavata-python-sdk-test==0.0.2) (4.2.1)\n", + "Requirement already satisfied: cryptography>=3.3 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from paramiko->airavata-python-sdk-test==0.0.2) (44.0.0)\n", + "Requirement already satisfied: pynacl>=1.5 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from paramiko->airavata-python-sdk-test==0.0.2) (1.5.0)\n", + "Requirement already satisfied: annotated-types>=0.6.0 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from pydantic->airavata-python-sdk-test==0.0.2) (0.7.0)\n", + "Requirement already satisfied: pydantic-core==2.27.1 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from pydantic->airavata-python-sdk-test==0.0.2) (2.27.1)\n", + "Requirement already satisfied: typing-extensions>=4.12.2 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from pydantic->airavata-python-sdk-test==0.0.2) (4.12.2)\n", + "Requirement already satisfied: charset-normalizer<4,>=2 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from requests->airavata-python-sdk-test==0.0.2) (3.4.0)\n", + "Requirement already satisfied: idna<4,>=2.5 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from requests->airavata-python-sdk-test==0.0.2) (3.10)\n", + "Requirement already satisfied: certifi>=2017.4.17 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from requests->airavata-python-sdk-test==0.0.2) (2024.8.30)\n", + "Requirement already satisfied: markdown-it-py>=2.2.0 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from rich->airavata-python-sdk-test==0.0.2) (3.0.0)\n", + "Requirement already satisfied: pygments<3.0.0,>=2.13.0 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from rich->airavata-python-sdk-test==0.0.2) (2.18.0)\n", + "Requirement already satisfied: cffi>=1.12 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from cryptography>=3.3->paramiko->airavata-python-sdk-test==0.0.2) (1.17.1)\n", + "Requirement already satisfied: decorator in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from ipython>=6.1.0->ipywidgets->airavata-python-sdk-test==0.0.2) (5.1.1)\n", + "Requirement already satisfied: jedi>=0.16 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from ipython>=6.1.0->ipywidgets->airavata-python-sdk-test==0.0.2) (0.19.2)\n", + "Requirement already satisfied: matplotlib-inline in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from ipython>=6.1.0->ipywidgets->airavata-python-sdk-test==0.0.2) (0.1.7)\n", + "Requirement already satisfied: pexpect>4.3 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from ipython>=6.1.0->ipywidgets->airavata-python-sdk-test==0.0.2) (4.9.0)\n", + "Requirement already satisfied: prompt_toolkit<3.1.0,>=3.0.41 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from ipython>=6.1.0->ipywidgets->airavata-python-sdk-test==0.0.2) (3.0.48)\n", + "Requirement already satisfied: stack_data in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from ipython>=6.1.0->ipywidgets->airavata-python-sdk-test==0.0.2) (0.6.2)\n", + "Requirement already satisfied: mdurl~=0.1 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from markdown-it-py>=2.2.0->rich->airavata-python-sdk-test==0.0.2) (0.1.2)\n", + "Requirement already satisfied: pycparser in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from cffi>=1.12->cryptography>=3.3->paramiko->airavata-python-sdk-test==0.0.2) (2.22)\n", + "Requirement already satisfied: parso<0.9.0,>=0.8.4 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from jedi>=0.16->ipython>=6.1.0->ipywidgets->airavata-python-sdk-test==0.0.2) (0.8.4)\n", + "Requirement already satisfied: ptyprocess>=0.5 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from pexpect>4.3->ipython>=6.1.0->ipywidgets->airavata-python-sdk-test==0.0.2) (0.7.0)\n", + "Requirement already satisfied: wcwidth in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from prompt_toolkit<3.1.0,>=3.0.41->ipython>=6.1.0->ipywidgets->airavata-python-sdk-test==0.0.2) (0.2.13)\n", + "Requirement already satisfied: executing>=1.2.0 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from stack_data->ipython>=6.1.0->ipywidgets->airavata-python-sdk-test==0.0.2) (2.1.0)\n", + "Requirement already satisfied: asttokens>=2.1.0 in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from stack_data->ipython>=6.1.0->ipywidgets->airavata-python-sdk-test==0.0.2) (3.0.0)\n", + "Requirement already satisfied: pure-eval in /Users/yasith/.mamba/envs/airavata/lib/python3.12/site-packages (from stack_data->ipython>=6.1.0->ipywidgets->airavata-python-sdk-test==0.0.2) (0.2.3)\n", + "Building wheels for collected packages: airavata-python-sdk-test\n", + " Building editable for airavata-python-sdk-test (pyproject.toml) ... \u001b[?25ldone\n", + "\u001b[?25h Created wheel for airavata-python-sdk-test: filename=airavata_python_sdk_test-0.0.2-0.editable-py3-none-any.whl size=11284 sha256=c3d58cfa6d1cd393fa9ff8ff597e416e466721170a37adcd4b3429d39076dc3e\n", + " Stored in directory: /private/var/folders/_n/fcf6nx4j67gbbt4_8mjqxdc80000gn/T/pip-ephem-wheel-cache-srerellb/wheels/6a/64/3a/ba5bbd28958f1b9f1f2d15d2c8999c899e17c402760ebd7d24\n", + "Successfully built airavata-python-sdk-test\n", + "Installing collected packages: airavata-python-sdk-test\n", + " Attempting uninstall: airavata-python-sdk-test\n", + " Found existing installation: airavata-python-sdk-test 0.0.2\n", + " Uninstalling airavata-python-sdk-test-0.0.2:\n", + " Successfully uninstalled airavata-python-sdk-test-0.0.2\n", + "Successfully installed airavata-python-sdk-test-0.0.2\n", + "Note: you may need to restart the kernel to use updated packages.\n", + "/Users/yasith/projects/artisan/airavata/airavata-api/airavata-client-sdks/airavata-python-sdk/samples\n" + ] + } + ], + "source": [ + "%pip install --upgrade airavata-python-sdk-test" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Import the Experiments SDK" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import airavata_experiments as ae\n", + "from airavata_experiments import md" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Authenticate for Remote Execution\n", + "\n", + "To authenticate for remote execution, call the `ae.login()` method.\n", + "This method will prompt you to enter your credentials and authenticate your session." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Using saved token\n" + ] + } + ], + "source": [ + "ae.login()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Once authenticated, the `ae.list_runtimes()` function can be called to list HPC resources that the user can access." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "runtimes = ae.list_runtimes()\n", + "display(runtimes)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Upload Experiment Files\n", + "\n", + "Drag and drop experiment files onto the workspace that this notebook is run on.\n", + "\n", + "```bash\n", + "(sh) $: tree data\n", + "data\n", + "├── b4pull.pdb\n", + "├── b4pull.restart.coor\n", + "├── b4pull.restart.vel\n", + "├── b4pull.restart.xsc\n", + "├── par_all36_water.prm\n", + "├── par_all36m_prot.prm\n", + "├── pull.conf\n", + "├── structure.pdb\n", + "└── structure.psf\n", + "\n", + "1 directory, 9 files\n", + "\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Define a NAMD Experiment\n", + "\n", + "The `md.NAMD.initialize()` is used to define a NAMD experiment.\n", + "Here, provide the paths to the `.conf` file, the `.pdb` file, the `.psf` file, any optional files you want to run NAMD on.\n", + "You can preview the function definition through auto-completion.\n", + "\n", + "```python\n", + "def initialize(\n", + " name: str,\n", + " config_file: str,\n", + " pdb_file: str,\n", + " psf_file: str,\n", + " ffp_files: list[str],\n", + " other_files: list[str] = [],\n", + " parallelism: Literal['CPU', 'GPU'] = \"CPU\",\n", + " num_replicas: int = 1\n", + ") -> Experiment[ExperimentApp]\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "exp = md.NAMD.initialize(\n", + " name=\"yasith_namd_experiment\",\n", + " config_file=\"data/pull.conf\",\n", + " pdb_file=\"data/structure.pdb\",\n", + " psf_file=\"data/structure.psf\",\n", + " ffp_files=[\n", + " \"data/par_all36_water.prm\",\n", + " \"data/par_all36m_prot.prm\"\n", + " ],\n", + " other_files=[\n", + " \"data/b4pull.pdb\",\n", + " \"data/b4pull.restart.coor\",\n", + " \"data/b4pull.restart.vel\",\n", + " \"data/b4pull.restart.xsc\",\n", + " ],\n", + " parallelism=\"GPU\",\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To add replica runs, simply call the `exp.add_replica()` function.\n", + "You can call the `add_replica()` function as many times as you want replicas.\n", + "Any optional resource constraint can be provided here." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "exp.add_replica()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create Execution Plan\n", + "\n", + "Call the `exp.plan()` function to transform the experiment definition + replicas into a stateful execution plan.\n", + "This plan can be exported in JSON format and imported back." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "plan = exp.plan() # this will create a plan for the experiment\n", + "plan.describe() # this will describe the plan\n", + "plan.save_json(\"plan.json\") # save the plan state" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Execute the Plan" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "plan = ae.load_plan(\"plan.json\")\n", + "plan.launch()\n", + "plan.save_json(\"plan.json\") # save the plan state" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Option A - Wait for Completion" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "plan = ae.load_plan(\"plan.json\")\n", + "plan.describe()" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "fc07d00c22d04fc2b7d4eb2235fe810b", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "Output()" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\"></pre>\n" + ], + "text/plain": [] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Interrupted by user.\n" + ] + } + ], + "source": [ + "plan = ae.load_plan(\"plan.json\")\n", + "plan.join()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Option B - Terminate Execution" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "plan = ae.load_plan(\"plan.json\")\n", + "plan.stop()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Option C - Monitor Files During Execution" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Displaying the status and files generated by each replica (task)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "ExperimentStatus(state=4, timeOfStateChange=1733417291473, reason='process started', statusId='EXPERIMENT_STATE_451b84f8-b6e8-472c-84b6-23460a4ecbdf')\n", + "['1',\n", + " 'A1497186742',\n", + " 'NAMD.stderr',\n", + " 'NAMD.stdout',\n", + " 'NAMD_Repl_.dcd',\n", + " 'NAMD_Repl_1.out',\n", + " 'b4pull.pdb',\n", + " 'b4pull.restart.coor',\n", + " 'b4pull.restart.vel',\n", + " 'b4pull.restart.xsc',\n", + " 'job_1017394371.slurm',\n", + " 'par_all36_water.prm',\n", + " 'par_all36m_prot.prm',\n", + " 'pull.conf',\n", + " 'structure.pdb',\n", + " 'structure.psf',\n", + " '']\n" + ] + }, + { + "data": { + "text/plain": [ + "\"ExeTyp=GPU\\nPJobID=\\nrep_list=\\nnum_rep=1\\ninput=pull.conf\\nagent_id=e016b89f-5eef-4e8e-b4eb-d202942dc76d\\nserver_url=api.gateway.cybershuttle.org\\n The Airavata Gateway User is scigap\\n Namd run will use the input pull.conf\\nNo replica array \\nGPU executable will be used\\nLoading gpu modules\\nGPU Run Command is time -p mpirun --hostfile ./HostFile -np 10 namd3 +p10 pull.conf\\nlrwxrwxrwx 1 scigap ind123 103 Dec 5 08:48 35590436 -> /expanse/lustre/scratch/scigap/te [...] + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "plan = ae.load_plan(\"plan.json\")\n", + "from pprint import pprint\n", + "for task in plan.tasks:\n", + " status = task.status()\n", + " print(status)\n", + " files = task.files()\n", + " pprint(files)\n", + "\n", + "display(plan.tasks[0].cat(\"NAMD.stdout\"))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Displaying the intermediate results generated by each replica (task)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from matplotlib import pyplot as plt\n", + "import pandas as pd\n", + "\n", + "for index, task in enumerate(plan.tasks):\n", + "\n", + " @cs.task_context(task)\n", + " def visualize():\n", + " data = pd.read_csv(\"data.csv\")\n", + " plt.figure(figsize=(8, 6))\n", + " plt.plot(data[\"x\"], data[\"y\"], marker=\"o\", linestyle=\"-\", linewidth=2, markersize=6)\n", + " plt.title(f\"Plot for Replica {index} of {len(plan.tasks)}\")\n", + "\n", + " visualize()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "airavata", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.7" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/setup.cfg b/airavata-api/airavata-client-sdks/airavata-python-sdk/setup.cfg deleted file mode 100644 index 3397c30e4f..0000000000 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/setup.cfg +++ /dev/null @@ -1,26 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -[bdist_wheel] -universal = 1 - -[metadata] -description-file = README.md -license_file = LICENSE - -[aliases] -test = pytest \ No newline at end of file diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/setup.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/setup.py deleted file mode 100644 index a85eb3c14c..0000000000 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/setup.py +++ /dev/null @@ -1,22 +0,0 @@ -import os - -from setuptools import setup, find_packages - - -def read(fname): - with open(os.path.join(os.path.dirname(__file__), fname)) as f: - return f.read() - - -setup( - name='airavata-python-sdk', - version='1.1.6', - packages=find_packages(), - package_data={'airavata_sdk.transport': ['*.ini'], 'airavata_sdk.samples.resources': ['*.pem']}, - url='http://airavata.com', - license='Apache License 2.0', - author='Airavata Developers', - author_email='[email protected]', - description='Apache Airavata Python SDK', - install_requires=read("requirements.txt").splitlines() -)
