This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch cybershuttle-dev in repository https://gitbox.apache.org/repos/asf/airavata.git
commit cd2d2a9d7e7ddfde0e5f98ae1078aefdb403aed9 Author: yasith <[email protected]> AuthorDate: Thu Dec 12 01:39:58 2024 -0600 venv support for agent, update notebooks, show tables as pandas df, update dir refs, fix bugs. --- .../airavata_experiments/__init__.py | 65 +++++- .../airavata_experiments/plan.py | 9 +- .../airavata_experiments/runtime.py | 12 +- .../airavata_experiments/task.py | 10 +- .../airavata-python-sdk/pyproject.toml | 3 +- .../airavata-python-sdk/samples/poc.ipynb | 259 +++++---------------- modules/agent-framework/airavata-agent/agent.go | 67 ++++-- .../jupyterhub/data/1_experiment_sdk.ipynb | 123 ++++++---- 8 files changed, 262 insertions(+), 286 deletions(-) diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/__init__.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/__init__.py index 0f770f2205..c6927de016 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/__init__.py +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/__init__.py @@ -18,6 +18,69 @@ from __future__ import annotations from . import base, md, plan from .auth import login, logout -from .runtime import list_runtimes +from .runtime import list_runtimes, Runtime __all__ = ["login", "logout", "list_runtimes", "base", "md", "plan"] + +def display_runtimes(runtimes: list[Runtime]): + """ + Display runtimes in a tabular format + """ + import pandas as pd + + records = [] + for runtime in runtimes: + record = dict(id=runtime.id, **runtime.args) + records.append(record) + + return pd.DataFrame(records) + +def display_experiments(experiments: list[base.Experiment]): + """ + Display experiments in a tabular format + """ + import pandas as pd + + records = [] + for experiment in experiments: + record = dict(name=experiment.name, application=experiment.application.app_id, num_tasks=len(experiment.tasks)) + for k, v in experiment.inputs.items(): + record[k] = ", ".join(v) if isinstance(v, list) else str(v) + records.append(record) + + return pd.DataFrame(records) + +def display_plans(plans: list[plan.Plan]): + """ + Display plans in a tabular format + """ + import pandas as pd + + records = [] + for plan in plans: + for task in plan.tasks: + record = dict(plan_id=str(plan.id)) + for k, v in task.model_dump().items(): + record[k] = ", ".join(v) if isinstance(v, list) else str(v) + records.append(record) + + return pd.DataFrame(records) + +def display(arg): + + if isinstance(arg, list): + if all(isinstance(x, Runtime) for x in arg): + return display_runtimes(arg) + if all(isinstance(x, base.Experiment) for x in arg): + return display_experiments(arg) + if all(isinstance(x, plan.Plan) for x in arg): + return display_plans(arg) + else: + if isinstance(arg, Runtime): + return display_runtimes([arg]) + if isinstance(arg, base.Experiment): + return display_experiments([arg]) + if isinstance(arg, plan.Plan): + return display_plans([arg]) + + raise NotImplementedError(f"Cannot display object of type {type(arg)}") \ No newline at end of file diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/plan.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/plan.py index 12e49a7d48..78991de4e8 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/plan.py +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/plan.py @@ -40,14 +40,8 @@ class Plan(pydantic.BaseModel): return [Task(**task) if isinstance(task, dict) else task for task in v] return v - def describe(self) -> None: - print(f"Plan(id={self.id}): {len(self.tasks)} tasks") - for task in self.tasks: - print(task) - def __stage_prepare__(self) -> None: print("Preparing execution plan...") - self.describe() def __stage_confirm__(self, silent: bool) -> None: print("Confirming execution plan...") @@ -173,7 +167,8 @@ def load_json(filename: str) -> Plan: model = json.load(f) return Plan(**model) -def load(id: str) -> Plan: +def load(id: str | None) -> Plan: + assert id is not None av = AiravataOperator(context.access_token) az = av.__airavata_token__(av.access_token, av.default_gateway_id()) assert az.accessToken is not None diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/runtime.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/runtime.py index 40ee36196f..1cdfdb4284 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/runtime.py +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/runtime.py @@ -163,9 +163,9 @@ class Remote(Runtime): res = requests.post(f"https://{conn_svc_url}/api/v1/agent/executepythonrequest", json={ "libraries": libraries, "code": code, - "pythonVersion": "3.11", # TODO verify + "pythonVersion": "3.10", # TODO verify "keepAlive": False, # TODO verify - "parentExperimentId": task.ref, + "parentExperimentId": "/data", # the working directory "agentId": task.agent_ref, }) data = res.json() @@ -177,8 +177,8 @@ class Remote(Runtime): res = requests.get(f"https://{conn_svc_url}/api/v1/agent/executepythonresponse/{exc_id}") data = res.json() if data["available"]: - files = data["responseString"].split("\n") - return files + response = data["responseString"] + return print(response) time.sleep(1) except Exception as e: print(f"\nRemote execution failed! {e}") @@ -332,7 +332,5 @@ class Remote(Runtime): cluster="login.expanse.sdsc.edu", ) - def list_runtimes(**kwargs) -> list[Runtime]: - # TODO get list using token - return [Remote(cluster="login.expanse.sdsc.edu"), Remote(cluster="anvil.rcac.purdue.edu")] + return [Remote(cluster="login.expanse.sdsc.edu"), Remote(cluster="anvil.rcac.purdue.edu")] \ No newline at end of file diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/task.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/task.py index 6d67c2dcf3..a42ebea4e9 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/task.py +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/task.py @@ -40,8 +40,14 @@ class Task(pydantic.BaseModel): def __str__(self) -> str: return f"Task(\nname={self.name}\napp_id={self.app_id}\ninputs={self.inputs}\nruntime={self.runtime}\nref={self.ref}\nagent_ref={self.agent_ref}\nfile_path={self.sr_host}:{self.workdir}\n)" - def launch(self) -> None: - assert self.ref is None + def launch(self, force=True) -> None: + if not force and self.ref is not None: + print(f"[Task] Task {self.name} has already launched: ref={self.ref}") + return + if self.ref is not None: + input("[NOTE] Past runs will be overwritten! Hit Enter to continue...") + self.ref = None + self.agent_ref = None print(f"[Task] Executing {self.name} on {self.runtime}") self.runtime.execute(self) diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/pyproject.toml b/airavata-api/airavata-client-sdks/airavata-python-sdk/pyproject.toml index 5a9c418151..14b3f9653f 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/pyproject.toml +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "airavata-python-sdk-test" -version = "0.0.5.post1" +version = "0.0.6.post3" description = "Apache Airavata Python SDK" readme = "README.md" license = { text = "Apache License 2.0" } @@ -25,6 +25,7 @@ dependencies = [ "pydantic", "rich", "ipywidgets", + "pandas", ] [tool.setuptools.packages.find] diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/samples/poc.ipynb b/airavata-api/airavata-client-sdks/airavata-python-sdk/samples/poc.ipynb index a44d17507a..7ec5ed6651 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/samples/poc.ipynb +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/samples/poc.ipynb @@ -4,17 +4,17 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# Airavata Experiment SDK - Molecular Dynamics Example\n", + "# Cybershuttle SDK - Molecular Dynamics\n", + "> Define, run, monitor, and analyze molecular dynamics experiments in a HPC-agnostic way.\n", "\n", - "This SDK allows users to define, plan, and execute molecular dynamics experiments with ease.\n", - "Here we demonstrate how to authenticate, set up a NAMD experiment, add replicas, create an execution plan, and monitor the execution." + "This notebook shows how users can setup and launch a **NAMD** experiment with replicas, monitor its execution, and run analyses both during and after execution." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Install the required packages\n", + "## Installing Required Packages\n", "\n", "First, install the `airavata-python-sdk-test` package from the pip repository." ] @@ -25,8 +25,6 @@ "metadata": {}, "outputs": [], "source": [ - "%pip uninstall -y airavata-python-sdk-test\n", - "%pip cache purge\n", "%pip install -e airavata-api/airavata-client-sdks/airavata-python-sdk" ] }, @@ -34,29 +32,14 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Import the Experiments SDK" + "## Importing the SDK" ] }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "/Users/yasith/projects/artisan/airavata/airavata-api/airavata-client-sdks/airavata-python-sdk/samples\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "using legacy validation callback\n" - ] - } - ], + "outputs": [], "source": [ "%cd airavata-api/airavata-client-sdks/airavata-python-sdk/samples\n", "import airavata_experiments as ae\n", @@ -67,7 +50,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Authenticate for Remote Execution\n", + "## Authenticating\n", "\n", "To authenticate for remote execution, call the `ae.login()` method.\n", "This method will prompt you to enter your credentials and authenticate your session." @@ -86,7 +69,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Once authenticated, the `ae.list_runtimes()` function can be called to list HPC resources that the user can access." + "Once authenticated, the `ae.list_runtimes()` function can be called to list HPC resources that the user has access to." ] }, { @@ -96,14 +79,14 @@ "outputs": [], "source": [ "runtimes = ae.list_runtimes()\n", - "display(runtimes)" + "ae.display(runtimes)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Upload Experiment Files\n", + "## Uploading Experiment Files\n", "\n", "Drag and drop experiment files onto the workspace that this notebook is run on.\n", "\n", @@ -129,7 +112,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Define a NAMD Experiment\n", + "## Defining a NAMD Experiment\n", "\n", "The `md.NAMD.initialize()` is used to define a NAMD experiment.\n", "Here, provide the paths to the `.conf` file, the `.pdb` file, the `.psf` file, any optional files you want to run NAMD on.\n", @@ -146,7 +129,13 @@ " parallelism: Literal['CPU', 'GPU'] = \"CPU\",\n", " num_replicas: int = 1\n", ") -> Experiment[ExperimentApp]\n", - "```" + "```\n", + "\n", + "To add replica runs, simply call the `exp.add_replica()` function.\n", + "You can call the `add_replica()` function as many times as you want replicas.\n", + "Any optional resource constraint can be provided here.\n", + "\n", + "You can also call `ae.display()` to pretty-print the experiment." ] }, { @@ -171,16 +160,18 @@ " \"data/b4pull.restart.xsc\",\n", " ],\n", " parallelism=\"GPU\",\n", - ")" + ")\n", + "exp.add_replica()\n", + "ae.display(exp)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "To add replica runs, simply call the `exp.add_replica()` function.\n", - "You can call the `add_replica()` function as many times as you want replicas.\n", - "Any optional resource constraint can be provided here." + "## Creating an Execution Plan\n", + "\n", + "Call the `exp.plan()` function to transform the experiment definition + replicas into a stateful execution plan." ] }, { @@ -189,17 +180,17 @@ "metadata": {}, "outputs": [], "source": [ - "exp.add_replica()" + "plan = exp.plan()\n", + "ae.display(plan)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Create Execution Plan\n", + "## Saving the Plan\n", "\n", - "Call the `exp.plan()` function to transform the experiment definition + replicas into a stateful execution plan.\n", - "This plan can be exported in JSON format and imported back." + "A created plan can be saved locally (in JSON) or remotely (in a user-local DB) for later reference." ] }, { @@ -208,15 +199,19 @@ "metadata": {}, "outputs": [], "source": [ - "plan = exp.plan() # this will create a plan for the experiment\n", - "plan.describe() # this will describe the plan" + "plan.save() # this will save the plan in DB\n", + "plan.save_json(\"plan.json\") # save the plan state locally" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Execute the Plan" + "## Launching the Plan\n", + "\n", + "A created plan can be launched using the `plan.launch()` function.\n", + "Changes to plan states will be automatically saved onto the remote.\n", + "However, plan state can also be tracked locally by invoking `plan.save_json()`." ] }, { @@ -225,16 +220,17 @@ "metadata": {}, "outputs": [], "source": [ - "plan.save() # this will save the plan in DB\n", - "plan.launch() # this will launch the plan\n", - "plan.save_json(\"plan.json\") # this will save the plan locally" + "plan.launch()\n", + "plan.save_json(\"plan.json\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Load and Describe the Launched Plan" + "## Loading a Saved Plan\n", + "\n", + "A saved plan can be loaded by calling `ae.plan.load_json(plan_path)` (for local plans) or `ae.plan.load(plan_id)` (for remote plans)." ] }, { @@ -243,166 +239,37 @@ "metadata": {}, "outputs": [], "source": [ - "assert plan.id is not None\n", + "plan = ae.plan.load_json(\"plan.json\")\n", "plan = ae.plan.load(plan.id)\n", - "plan.describe()" + "ae.display(plan)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## List all Plans the User Created" + "## Fetching User-Defined Plans\n", + "\n", + "The `ae.plan.query()` function retrieves all plans stored in the remote." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "<div>\n", - "<style scoped>\n", - " .dataframe tbody tr th:only-of-type {\n", - " vertical-align: middle;\n", - " }\n", - "\n", - " .dataframe tbody tr th {\n", - " vertical-align: top;\n", - " }\n", - "\n", - " .dataframe thead th {\n", - " text-align: right;\n", - " }\n", - "</style>\n", - "<table border=\"1\" class=\"dataframe\">\n", - " <thead>\n", - " <tr style=\"text-align: right;\">\n", - " <th></th>\n", - " <th>id</th>\n", - " </tr>\n", - " </thead>\n", - " <tbody>\n", - " <tr>\n", - " <th>0</th>\n", - " <td>16781b12-fd99-496c-b815-c0fdc5889664</td>\n", - " </tr>\n", - " <tr>\n", - " <th>1</th>\n", - " <td>2a09f1c4-8a0a-46e4-bbdd-ffab13be3d5b</td>\n", - " </tr>\n", - " <tr>\n", - " <th>2</th>\n", - " <td>2a7896fa-5898-42f6-92b6-c053e4a702ba</td>\n", - " </tr>\n", - " <tr>\n", - " <th>3</th>\n", - " <td>2e206dab-ada7-45a6-a2ea-940adf9ef646</td>\n", - " </tr>\n", - " <tr>\n", - " <th>4</th>\n", - " <td>4fb8a73b-8333-4c73-8e74-5dd103f8a22f</td>\n", - " </tr>\n", - " <tr>\n", - " <th>5</th>\n", - " <td>5197d68c-63ec-4d13-bac5-24484e1d0ca6</td>\n", - " </tr>\n", - " <tr>\n", - " <th>6</th>\n", - " <td>54b9dcd6-a5e8-4a05-9690-aacd346de55c</td>\n", - " </tr>\n", - " <tr>\n", - " <th>7</th>\n", - " <td>562a195e-83f9-4de4-af5b-c43b4a2a40f6</td>\n", - " </tr>\n", - " <tr>\n", - " <th>8</th>\n", - " <td>768d97d5-233b-4450-a7e3-4df31f1fac3c</td>\n", - " </tr>\n", - " <tr>\n", - " <th>9</th>\n", - " <td>82814692-63fa-48e1-9e26-78b75269f513</td>\n", - " </tr>\n", - " <tr>\n", - " <th>10</th>\n", - " <td>ae70b7d2-294e-44c1-b2d7-8586642e241e</td>\n", - " </tr>\n", - " <tr>\n", - " <th>11</th>\n", - " <td>af3a2094-5bb3-4452-a9c3-45451bfd23cb</td>\n", - " </tr>\n", - " <tr>\n", - " <th>12</th>\n", - " <td>b82ae820-93bc-4e26-b080-2563824a1c5b</td>\n", - " </tr>\n", - " <tr>\n", - " <th>13</th>\n", - " <td>c51d01a2-4b57-47c7-a4d2-91a8ede53c77</td>\n", - " </tr>\n", - " <tr>\n", - " <th>14</th>\n", - " <td>d5db8cc0-76da-4435-9509-3a5733c41d7e</td>\n", - " </tr>\n", - " <tr>\n", - " <th>15</th>\n", - " <td>d6e5e9f0-dc11-4262-b16a-51fef7be42c1</td>\n", - " </tr>\n", - " <tr>\n", - " <th>16</th>\n", - " <td>d763f645-2f8f-460e-9e83-4a98365508eb</td>\n", - " </tr>\n", - " <tr>\n", - " <th>17</th>\n", - " <td>eff68e72-c585-4066-a8a0-d36cc45f648c</td>\n", - " </tr>\n", - " <tr>\n", - " <th>18</th>\n", - " <td>fcc54603-aa0b-4ca7-89ac-c04d0725f4cb</td>\n", - " </tr>\n", - " </tbody>\n", - "</table>\n", - "</div>" - ], - "text/plain": [ - " id\n", - "0 16781b12-fd99-496c-b815-c0fdc5889664\n", - "1 2a09f1c4-8a0a-46e4-bbdd-ffab13be3d5b\n", - "2 2a7896fa-5898-42f6-92b6-c053e4a702ba\n", - "3 2e206dab-ada7-45a6-a2ea-940adf9ef646\n", - "4 4fb8a73b-8333-4c73-8e74-5dd103f8a22f\n", - "5 5197d68c-63ec-4d13-bac5-24484e1d0ca6\n", - "6 54b9dcd6-a5e8-4a05-9690-aacd346de55c\n", - "7 562a195e-83f9-4de4-af5b-c43b4a2a40f6\n", - "8 768d97d5-233b-4450-a7e3-4df31f1fac3c\n", - "9 82814692-63fa-48e1-9e26-78b75269f513\n", - "10 ae70b7d2-294e-44c1-b2d7-8586642e241e\n", - "11 af3a2094-5bb3-4452-a9c3-45451bfd23cb\n", - "12 b82ae820-93bc-4e26-b080-2563824a1c5b\n", - "13 c51d01a2-4b57-47c7-a4d2-91a8ede53c77\n", - "14 d5db8cc0-76da-4435-9509-3a5733c41d7e\n", - "15 d6e5e9f0-dc11-4262-b16a-51fef7be42c1\n", - "16 d763f645-2f8f-460e-9e83-4a98365508eb\n", - "17 eff68e72-c585-4066-a8a0-d36cc45f648c\n", - "18 fcc54603-aa0b-4ca7-89ac-c04d0725f4cb" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ - "import pandas as pd\n", "plans = ae.plan.query()\n", - "display(pd.DataFrame([plan.model_dump(include={\"id\"}) for plan in plans]))" + "ae.display(plans)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Check Plan Status" + "## Checking Plan Status\n", + "\n", + "The plan's execution status can be checked by calling `plan.status()`." ] }, { @@ -418,7 +285,10 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Block Until Plan Completes" + "## Managing Plan Execution\n", + "\n", + "The `plan.stop()` function will stop a currently executing plan.\n", + "The `plan.join()` function would block code execution until the plan completes its execution." ] }, { @@ -427,6 +297,7 @@ "metadata": {}, "outputs": [], "source": [ + "plan.stop()\n", "plan.join()" ] }, @@ -434,23 +305,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Stop Plan Execution" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "plan.stop()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Run File Operations on Plan" + "## Running File Operations" ] }, { diff --git a/modules/agent-framework/airavata-agent/agent.go b/modules/agent-framework/airavata-agent/agent.go index 99d28c8088..5052770733 100644 --- a/modules/agent-framework/airavata-agent/agent.go +++ b/modules/agent-framework/airavata-agent/agent.go @@ -8,15 +8,16 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "log" "net" "net/http" "os" "os/exec" + "strings" "golang.org/x/crypto/ssh" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) func main() { @@ -27,7 +28,7 @@ func main() { grpcStreamChannel := make(chan struct{}) kernelChannel := make(chan struct{}) - conn, err := grpc.Dial(serverUrl, grpc.WithInsecure(), grpc.WithBlock()) + conn, err := grpc.NewClient(serverUrl, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf("did not connect: %v", err) } @@ -127,23 +128,47 @@ func main() { log.Printf("[agent.go] Working Dir %s", workingDir) log.Printf("[agent.go] Libraries %s", libraries) - // TODO: cd into working dir, create the virtual environment with provided libraries - cmd := exec.Command("python3", "-c", code) //TODO: Load python runtime from a config - - output, err := cmd.Output() - if err != nil { - fmt.Println("[agent.go] Failed to run python command:", err) - return - } - - stdoutString := string(output) - if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_PythonExecutionResponse{ - PythonExecutionResponse: &protos.PythonExecutionResponse{ - SessionId: sessionId, - ExecutionId: executionId, - ResponseString: stdoutString}}}); err != nil { - log.Printf("[agent.go] Failed to send execution result to server: %v", err) - } + // bash script to + // (a) create the virtual environment, + // (b) source it, and + // (c) run a python code + bashScript := ` + workingDir="%s"; + cd $workingDir; + if [ ! -f "$workingDir/venv/pyenv.cfg" ]; then + rm -rf $workingDir/venv; + python3 -m venv $workingDir/venv; + fi + source $workingDir/venv/bin/activate + pip install %s > /dev/null + python -c "%s" + ` + + runCmd := fmt.Sprintf( + bashScript, + workingDir, + strings.Join(libraries, " "), + strings.ReplaceAll(code, `"`, `\"`), + ) + log.Printf("[agent.go] Running bash script:\n%s", runCmd) + cmd := exec.Command("bash", "-c", runCmd) + + go func() { + output, err := cmd.Output() + if err != nil { + fmt.Println("[agent.go] Failed to run python command:", err) + return + } + stdoutString := string(output) + log.Printf("[agent.go] Execution output is %s", stdoutString) + if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_PythonExecutionResponse{ + PythonExecutionResponse: &protos.PythonExecutionResponse{ + SessionId: sessionId, + ExecutionId: executionId, + ResponseString: stdoutString}}}); err != nil { + log.Printf("[agent.go] Failed to send execution result to server: %v", err) + } + }() case *protos.ServerMessage_CommandExecutionRequest: log.Printf("[agent.go] Recived a command execution request") @@ -219,7 +244,7 @@ func main() { } }() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { log.Printf("[agent.go] Failed to read response for start jupyter kernel: %v", err) @@ -299,7 +324,7 @@ func main() { }() log.Printf("[agent.go] Sending the jupyter execution " + executionId + "result to server...") - body, err = ioutil.ReadAll(resp.Body) + body, err = io.ReadAll(resp.Body) if err != nil { log.Printf("[agent.go] Failed to read response for run jupyter kernel: %v", err) diff --git a/modules/agent-framework/deployments/jupyterhub/data/1_experiment_sdk.ipynb b/modules/agent-framework/deployments/jupyterhub/data/1_experiment_sdk.ipynb index 858f473622..1e5dba8f50 100644 --- a/modules/agent-framework/deployments/jupyterhub/data/1_experiment_sdk.ipynb +++ b/modules/agent-framework/deployments/jupyterhub/data/1_experiment_sdk.ipynb @@ -4,16 +4,20 @@ "cell_type": "markdown", "metadata": {}, "source": [ + "# Cybershuttle SDK - Molecular Dynamics\n", + "> Define, run, monitor, and analyze molecular dynamics experiments in a HPC-agnostic way.\n", "# Cybershuttle SDK - Molecular Dynamics\n", "> Define, run, monitor, and analyze molecular dynamics experiments in a HPC-agnostic way.\n", "\n", "This notebook shows how users can setup and launch a **NAMD** experiment with replicas, monitor its execution, and run analyses both during and after execution." + "This notebook shows how users can setup and launch a **NAMD** experiment with replicas, monitor its execution, and run analyses both during and after execution." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ + "## Installing Required Packages\n", "## Installing Required Packages\n", "\n", "First, install the `airavata-python-sdk-test` package from the pip repository." @@ -33,6 +37,7 @@ "metadata": {}, "source": [ "## Importing the SDK" + "## Importing the SDK" ] }, { @@ -49,6 +54,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ + "## Authenticating\n", "## Authenticating\n", "\n", "To authenticate for remote execution, call the `ae.login()` method.\n", @@ -58,8 +64,10 @@ { "cell_type": "code", "execution_count": null, + "execution_count": null, "metadata": {}, "outputs": [], + "outputs": [], "source": [ "ae.login()" ] @@ -69,6 +77,7 @@ "metadata": {}, "source": [ "Once authenticated, the `ae.list_runtimes()` function can be called to list HPC resources that the user has access to." + "Once authenticated, the `ae.list_runtimes()` function can be called to list HPC resources that the user has access to." ] }, { @@ -79,6 +88,7 @@ "source": [ "runtimes = ae.list_runtimes()\n", "ae.display(runtimes)" + "ae.display(runtimes)" ] }, { @@ -111,6 +121,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ + "## Defining a NAMD Experiment\n", "## Defining a NAMD Experiment\n", "\n", "The `md.NAMD.initialize()` is used to define a NAMD experiment.\n", @@ -135,6 +146,13 @@ "Any optional resource constraint can be provided here.\n", "\n", "You can also call `ae.display()` to pretty-print the experiment." + "```\n", + "\n", + "To add replica runs, simply call the `exp.add_replica()` function.\n", + "You can call the `add_replica()` function as many times as you want replicas.\n", + "Any optional resource constraint can be provided here.\n", + "\n", + "You can also call `ae.display()` to pretty-print the experiment." ] }, { @@ -158,10 +176,9 @@ " \"data/b4pull.restart.vel\",\n", " \"data/b4pull.restart.xsc\",\n", " ],\n", - " parallelism=\"CPU\",\n", - " num_replicas=1,\n", + " parallelism=\"GPU\",\n", ")\n", - "exp.add_replica(*ae.list_runtimes(cluster=\"login.expanse.sdsc.edu\", category=\"cpu\"))\n", + "exp.add_replica()\n", "ae.display(exp)" ] }, @@ -172,6 +189,9 @@ "## Creating an Execution Plan\n", "\n", "Call the `exp.plan()` function to transform the experiment definition + replicas into a stateful execution plan." + "## Creating an Execution Plan\n", + "\n", + "Call the `exp.plan()` function to transform the experiment definition + replicas into a stateful execution plan." ] }, { @@ -182,15 +202,19 @@ "source": [ "plan = exp.plan()\n", "ae.display(plan)" + "plan = exp.plan()\n", + "ae.display(plan)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ + "## Saving the Plan\n", "## Saving the Plan\n", "\n", "A created plan can be saved locally (in JSON) or remotely (in a user-local DB) for later reference." + "A created plan can be saved locally (in JSON) or remotely (in a user-local DB) for later reference." ] }, { @@ -199,8 +223,8 @@ "metadata": {}, "outputs": [], "source": [ - "plan.save() # this will save the plan in DB\n", - "plan.save_json(\"plan.json\") # save the plan state locally" + "plan.save() # this will save the plan in DB\n", + "plan.save_json(\"plan.json\") # save the plan state locally" ] }, { @@ -212,6 +236,11 @@ "A created plan can be launched using the `plan.launch()` function.\n", "Changes to plan states will be automatically saved onto the remote.\n", "However, plan state can also be tracked locally by invoking `plan.save_json()`." + "## Launching the Plan\n", + "\n", + "A created plan can be launched using the `plan.launch()` function.\n", + "Changes to plan states will be automatically saved onto the remote.\n", + "However, plan state can also be tracked locally by invoking `plan.save_json()`." ] }, { @@ -222,14 +251,16 @@ "source": [ "plan.launch()\n", "plan.save_json(\"plan.json\")" + "plan.save_json(\"plan.json\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Checking the Plan Status\n", - "The status of a plan can be retrieved by calling `plan.status()`." + "## Loading a Saved Plan\n", + "\n", + "A saved plan can be loaded by calling `ae.plan.load_json(plan_path)` (for local plans) or `ae.plan.load(plan_id)` (for remote plans)." ] }, { @@ -238,37 +269,38 @@ "metadata": {}, "outputs": [], "source": [ - "plan.status()" + "plan = ae.plan.load_json(\"plan.json\")\n", + "plan = ae.plan.load(plan.id)\n", + "ae.display(plan)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Loading a Saved Plan\n", + "## Fetching User-Defined Plans\n", "\n", - "A saved plan can be loaded by calling `ae.plan.load_json(plan_path)` (for local plans) or `ae.plan.load(plan_id)` (for remote plans)." + "The `ae.plan.query()` function retrieves all plans stored in the remote." ] }, { "cell_type": "code", "execution_count": null, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "plan = ae.plan.load_json(\"plan.json\")\n", - "plan = ae.plan.load(plan.id)\n", - "plan.status()\n", - "ae.display(plan)" + "plans = ae.plan.query()\n", + "ae.display(plans)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Fetching User-Defined Plans\n", + "## Checking Plan Status\n", "\n", - "The `ae.plan.query()` function retrieves all plans stored in the remote." + "The plan's execution status can be checked by calling `plan.status()`." ] }, { @@ -277,8 +309,7 @@ "metadata": {}, "outputs": [], "source": [ - "plans = ae.plan.query()\n", - "ae.display(plans)" + "plan.status()" ] }, { @@ -288,7 +319,7 @@ "## Managing Plan Execution\n", "\n", "The `plan.stop()` function will stop a currently executing plan.\n", - "The `plan.wait_for_completion()` function would block until the plan finishes executing." + "The `plan.join()` function would block code execution until the plan completes its execution." ] }, { @@ -297,8 +328,15 @@ "metadata": {}, "outputs": [], "source": [ - "# plan.stop()\n", - "# plan.wait_for_completion()" + "plan.stop()\n", + "plan.join()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Running File Operations" ] }, { @@ -318,27 +356,19 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "for task in plan.tasks:\n", - " print(task.name, task.pid)\n", - " display(task.ls()) # list files\n", - " task.upload(\"data/sample.txt\") # upload sample.txt\n", - " display(task.ls()) # list files AFTER upload\n", - " display(task.cat(\"sample.txt\")) # preview sample.txt\n", - " task.download(\"sample.txt\", f\"./results_{task.name}\") # download sample.txt" - ] - }, - { - "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "plan.wait_for_completion() # wait for plan to complete\n", "for task in plan.tasks:\n", - " task.download_all(f\"./results_{task.name}\") # download plan outputs" + " status = task.status()\n", + " print(status)\n", + " # task.upload(\"data/sample.txt\")\n", + " files = task.ls()\n", + " display(files)\n", + " display(task.cat(\"NAMD.stderr\"))\n", + " # task.download(\"NAMD.stdout\", \"./results\")\n", + " task.download(\"NAMD_Repl_1.out\", \"./results\")" ] }, { @@ -360,14 +390,17 @@ "outputs": [], "source": [ "for index, task in enumerate(plan.tasks):\n", - " @task.context(packages=[\"numpy\", \"pandas\"])\n", - " def analyze() -> None:\n", - " import numpy as np\n", - " with open(\"pull.conf\", \"r\") as f:\n", - " data = f.read()\n", - " print(\"pull.conf has\", len(data), \"chars\")\n", - " print(np.arange(10))\n", - " analyze()" + "\n", + " @task.context(packages=[\"matplotlib\", \"pandas\"])\n", + " def analyze(x, y, index, num_tasks) -> None:\n", + " from matplotlib import pyplot as plt\n", + " import pandas as pd\n", + " df = pd.read_csv(\"data.csv\")\n", + " plt.figure(figsize=(x, y))\n", + " plt.plot(df[\"x\"], df[\"y\"], marker=\"o\", linestyle=\"-\", linewidth=2, markersize=6)\n", + " plt.title(f\"Plot for Replica {index} of {num_tasks}\")\n", + "\n", + " analyze(3, 4, index+1, len(plan.tasks))" ] } ],
