This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 397f1cdaf65ec8a1416cca96281d699fd1ca3163 Author: yasith <[email protected]> AuthorDate: Tue Feb 18 03:18:07 2025 -0600 add file ops to jupyter magics, cleanup code, update agent.sh --- .../airavata-agent/jupyter/Dockerfile | 5 +- .../jupyter/labconfig/airavata_magics.py | 230 +++++++++++++++------ .../jupyter/labconfig/device_auth.py | 59 ++++++ .../airavata-agent/scripts/agent.sh | 48 +++-- 4 files changed, 261 insertions(+), 81 deletions(-) diff --git a/modules/agent-framework/airavata-agent/jupyter/Dockerfile b/modules/agent-framework/airavata-agent/jupyter/Dockerfile index e0a42a7240..66f92f50df 100644 --- a/modules/agent-framework/airavata-agent/jupyter/Dockerfile +++ b/modules/agent-framework/airavata-agent/jupyter/Dockerfile @@ -1,4 +1,4 @@ -FROM python:slim +FROM python:3.12-slim RUN apt update RUN apt install fuse -y @@ -15,9 +15,10 @@ RUN git clone https://github.com/cyber-shuttle/jupyter-notebook-examples /home/j COPY labconfig/jupyter_lab_config.py /jupyter_lab_config.py COPY labconfig/airavata_magics.py /airavata_magics.py COPY labconfig/__init__.py /__init__.py +COPY labconfig/device_auth.py /device_auth.py COPY labconfig/bootstrap.sh /bootstrap.sh RUN chmod +x /bootstrap.sh -# COPY fuse/client /client + EXPOSE 8888 WORKDIR /home diff --git a/modules/agent-framework/airavata-agent/jupyter/labconfig/airavata_magics.py b/modules/agent-framework/airavata-agent/jupyter/labconfig/airavata_magics.py index d2b34d5d6a..6d0708f6c5 100644 --- a/modules/agent-framework/airavata-agent/jupyter/labconfig/airavata_magics.py +++ b/modules/agent-framework/airavata-agent/jupyter/labconfig/airavata_magics.py @@ -1,53 +1,76 @@ -from IPython.core.magic import register_cell_magic -from IPython.core.magic import register_line_magic - -from IPython.display import display, Image, HTML import base64 -import requests +import binascii import json -import jwt import time from pathlib import Path +from typing import NamedTuple +import jwt import os +import requests +from IPython.core.magic import register_cell_magic, register_line_magic +from IPython.display import HTML, Image, display +from device_auth import DeviceFlowAuthenticator + + +AgentInfo = NamedTuple('AgentInfo', [ + ('agentId', str), + ('experimentId', str), + ('processId', str), + ('cluster', str), + ('queue', str), + ('cpus', int), + ('memory', int), + ('walltime', int), + ('gateway_id', str), + ('group', str), +]) +api_base_url = "https://api.gateway.cybershuttle.org" +file_server_url = "http://3.142.234.94:8050" +current_agent : AgentInfo | None = None +MSG_NOT_INITIALIZED = r"Remote agent not initialized. Please run %init_remote cluster=<cluster> cpu=<cpu> memory=<memory mb> queue=<queue> walltime=<walltime minutes> group=<group>" + + +def get_access_token() -> str | None: + token_from_env = os.getenv('CS_ACCESS_TOKEN') + if token_from_env: + return token_from_env + EXPLICIT_TOKEN_FILE = Path("~").expanduser() / "csagent" / "token" / "keys.json" + if EXPLICIT_TOKEN_FILE.exists(): + with open(EXPLICIT_TOKEN_FILE, "r") as f: + return json.load(f).get("access_token") + + +def get_agent_status() -> dict | None: + if not current_agent: + return print(MSG_NOT_INITIALIZED) + url = f"{api_base_url}/api/v1/agent/{current_agent.agentId}" + response = requests.get(url) + if response.status_code == 202: + return response.json() + return print(f"Got [{response.status_code}] Response: {response.text}") + + +def get_process_id(experiment_id: str, headers) -> str: + """ + Get process id by experiment id + + """ + url = f"{api_base_url}/api/v1/exp/{experiment_id}/process" + process_id = "" + while not process_id: + response = requests.get(url, headers=headers) + if response.status_code == 200: + process_id = response.json().get("processId") + else: + time.sleep(5) + return process_id -current_agent_info = None - -EXPLICIT_TOKEN_FILE = ( - Path(os.path.expanduser("~")) / "csagent" / "token" / "keys.json" -) - -def read_explicit_token_file(): - if not EXPLICIT_TOKEN_FILE.exists(): - return None - else: - with open(EXPLICIT_TOKEN_FILE, "r") as f: - return json.load(f) - -def get_access_token(): - expl_token_data = read_explicit_token_file() - if expl_token_data: - return expl_token_data["access_token"] - -def get_agent_status(): - global current_agent_info - if not current_agent_info: - print("No agent was scheduled yet. Please run %init_remote cluster=<cluster> cpu=<cpu> memory=<memory mb> queue=<queue> walltime=<walltime minutes>") - return +def submit_agent_job(experiment_name, cluster, queue, cpus, memory, walltime, access_token, gateway_id='default', group='Default'): + global current_agent - url = 'https://api.gateway.cybershuttle.org/api/v1/agent/' + current_agent_info['agentId'] - response = requests.get(url) - if response.status_code == 202: - return response.json() - else: - print('Invalid response reveived. Status code:', response.status_code) - print('Response:', response.text) - -def submit_agent_job(experiment_name, cluster, queue, cpus, memory, wallTime, access_token, gateway_id='testdrive'): - - global current_agent_info # URL to which the POST request will be sent - url = 'https://api.gateway.cybershuttle.org/api/v1/exp/launch' + url = api_base_url + '/api/v1/exp/launch' # Data to be sent in the POST request data = { @@ -56,7 +79,7 @@ def submit_agent_job(experiment_name, cluster, queue, cpus, memory, wallTime, ac 'cpuCount': cpus, 'nodeCount': 1, 'memory': memory, - 'wallTime': wallTime, + 'wallTime': walltime, 'queue': queue } @@ -83,19 +106,32 @@ def submit_agent_job(experiment_name, cluster, queue, cpus, memory, wallTime, ac # Check if the request was successful if response.status_code == 200: # Parse the JSON response - response_data = response.json() - print('Response:', response_data) - current_agent_info = response_data + obj = response.json() + current_agent = AgentInfo( + agentId=obj['agentId'], + experimentId=obj['experimentId'], + processId=get_process_id(obj['experimentId'], headers=headers), + cluster=cluster, + queue=queue, + cpus=cpus, + memory=memory, + walltime=walltime, + gateway_id=gateway_id, + group=group, + ) + print('Agent Initialized:', current_agent) else: print('Failed to send POST request. Status code:', response.status_code) print('Response:', response.text) -def terminate_agent(access_token, gateway_id='testdrive'): - global current_agent_info +def terminate_agent(access_token, gateway_id='default'): + global current_agent + if not current_agent: + return print(MSG_NOT_INITIALIZED) - expId = current_agent_info['experimentId'] - url = 'https://api.gateway.cybershuttle.org/api/v1/exp/terminate/' + expId + expId = current_agent.experimentId + url = api_base_url + '/api/v1/exp/terminate/' + expId decode = jwt.decode(access_token, options={"verify_signature": False}) user_id = decode['preferred_username'] @@ -118,7 +154,7 @@ def terminate_agent(access_token, gateway_id='testdrive'): # Parse the JSON response response_data = response.json() print('Agent terminated:', response_data) - current_agent_info = None + current_agent = None else: print('Failed to send termination request. Status code:', response.status_code) print('Response:', response.text) @@ -126,19 +162,17 @@ def terminate_agent(access_token, gateway_id='testdrive'): @register_cell_magic def run_remote(line, cell): + global current_agent + if not current_agent: + return print(MSG_NOT_INITIALIZED) - global current_agent_info - if not current_agent_info: - print("No agent was scheduled yet. Please run %init_remote cluster=<cluster> cpu=<cpu> memory=<memory mb> queue=<queue> walltime=<walltime minutes>") - return - - url = 'https://api.gateway.cybershuttle.org/api/v1/agent/executejupyterrequest' + url = api_base_url + '/api/v1/agent/executejupyterrequest' data = { "sessionId": "session1", "keepAlive": True, "code": cell, - "agentId": current_agent_info["agentId"] + "agentId": current_agent.agentId } json_data = json.dumps(data) @@ -150,7 +184,7 @@ def run_remote(line, cell): print("Cell execution failed. Error: " + error) if execution_id: while True: - url = "https://api.gateway.cybershuttle.org/api/v1/agent/executejupyterresponse/" + execution_id + url = api_base_url + "/api/v1/agent/executejupyterresponse/" + execution_id response = requests.get(url, headers={'Accept': 'application/json'}) json_response = response.json() if json_response.get('available'): @@ -171,7 +205,7 @@ def run_remote(line, cell): try: image_bytes = base64.b64decode(image_data) display(Image(data=image_bytes, format='png')) - except base64.binascii.Error as e: + except binascii.Error as e: print(f"Failed to decode image data: {e}") # Ignoring any texts in the display data # if 'text/plain' in data_obj: @@ -186,7 +220,7 @@ def run_remote(line, cell): color: #a71d5d; background-color: #fdd; border: 1px solid #a71d5d; - padding: 10px; + padding: 5px; border-radius: 5px; font-family: Consolas, 'Courier New', monospace; white-space: pre-wrap; @@ -208,7 +242,7 @@ def run_remote(line, cell): color: #a71d5d; background-color: #fdd; border: 1px solid #a71d5d; - padding: 10px; + padding: 5px; border-radius: 5px; font-family: Consolas, 'Courier New', monospace; "> @@ -237,15 +271,24 @@ def run_remote(line, cell): try: image_bytes = base64.b64decode(image_data) display(Image(data=image_bytes, format='png')) - except base64.binascii.Error as e: + except binascii.Error as e: print(f"Failed to decode image data: {e}") break time.sleep(1) + @register_line_magic -def init_remote(line): +def cs_login(line): + try: + authenticator = DeviceFlowAuthenticator() + authenticator.login() + except ValueError as e: + print(f"Configuration error: {e}") + - if current_agent_info: +@register_line_magic +def init_remote(line): + if current_agent: status = get_agent_status() if status: if status['agentUp']: @@ -280,6 +323,7 @@ def init_remote(line): submit_agent_job('CS_Agent', cluster_value, queue_value, cpu_value, memory_value, walltime_value, access_token) + @register_line_magic def status_remote(line): status = get_agent_status() @@ -287,18 +331,70 @@ def status_remote(line): if status['agentUp']: print("Agent", status['agentId'], 'is running') else: - print("Agent", status['agentId'], 'is still prepairing. Please wait') + print("Agent", status['agentId'], 'is still preparing. Please wait') + @register_line_magic def terminate_remote(line): - global current_agent_info + global current_agent access_token = get_access_token() - if current_agent_info: + if current_agent: terminate_agent(access_token) +@register_line_magic +def push_remote(line): + if not current_agent: + return print(MSG_NOT_INITIALIZED) + pairs = line.split() + remot_path = None + local_path = None + for pair in pairs: + if pair.startswith("source="): + local_path = pair.split("=")[1] + if pair.startswith("target="): + remot_path = pair.split("=")[1] + # validate paths + if not remot_path or not local_path: + return print("Please provide paths for both source and target") + # upload file + print(f"Pushing local:{local_path} to remote:{remot_path}") + url = f"{file_server_url}/upload/live/{current_agent.processId}/{remot_path}" + with open(local_path, "rb") as file: + files = {"file": file} + response = requests.post(url, files=files) + print(f"[{response.status_code}] Uploaded local:{local_path} to remote:{remot_path}") + + +@register_line_magic +def pull_remote(line): + if not current_agent: + return print(MSG_NOT_INITIALIZED) + pairs = line.split() + remot_path = None + local_path = None + for pair in pairs: + if pair.startswith("source="): + remot_path = pair.split("=")[1] + if pair.startswith("target="): + local_path = pair.split("=")[1] + # validate paths + if not remot_path or not local_path: + return print("Please provide paths for both source and target") + # download file + print(f"Pulling remote:{remot_path} to local:{local_path}") + url = f"{file_server_url}/download/live/{current_agent.processId}/{remot_path}" + response = requests.get(url) + with open(local_path, "wb") as file: + file.write(response.content) + print(f"[{response.status_code}] Downloaded remote:{remot_path} to local:{local_path}") + + def load_ipython_extension(ipython): + ipython.register_magic_function(cs_login) ipython.register_magic_function(init_remote) ipython.register_magic_function(status_remote) ipython.register_magic_function(terminate_remote) ipython.register_magic_function(run_remote) + ipython.register_magic_function(push_remote) + ipython.register_magic_function(pull_remote) diff --git a/modules/agent-framework/airavata-agent/jupyter/labconfig/device_auth.py b/modules/agent-framework/airavata-agent/jupyter/labconfig/device_auth.py new file mode 100644 index 0000000000..58f325b387 --- /dev/null +++ b/modules/agent-framework/airavata-agent/jupyter/labconfig/device_auth.py @@ -0,0 +1,59 @@ +import requests +import time +import os +# Load environment variables from .env file + +class DeviceFlowAuthenticator: + def __init__(self): + self.client_id = "cybershuttle-agent" + self.realm = "default" + self.auth_server_url = "https://auth.cybershuttle.org" + + if not self.client_id or not self.realm or not self.auth_server_url: + raise ValueError("Missing required environment variables for client ID, realm, or auth server URL") + + self.device_code = None + self.interval = None + + def login(self): + # Step 1: Request device and user code + auth_device_url = f"{self.auth_server_url}/realms/{self.realm}/protocol/openid-connect/auth/device" + response = requests.post(auth_device_url, data={"client_id": self.client_id, "scope": "openid"}) + + if response.status_code != 200: + print(f"Error in device authorization request: {response.status_code} - {response.text}") + return + + data = response.json() + self.device_code = data.get("device_code") + self.interval = data.get("interval", 5) + + print(f"User code: {data.get('user_code')}") + print(f"Please authenticate by visiting: {data.get('verification_uri_complete')}") + + # Step 2: Poll for the token + self.poll_for_token() + + def poll_for_token(self): + assert self.interval is not None + token_url = f"{self.auth_server_url}/realms/{self.realm}/protocol/openid-connect/token" + while True: + response = requests.post(token_url, data={ + "client_id": self.client_id, + "grant_type": "urn:ietf:params:oauth:grant-type:device_code", + "device_code": self.device_code + }) + + if response.status_code == 200: + data = response.json() + access_token = data.get("access_token") + print(f"Received access token") + os.environ['CS_ACCESS_TOKEN'] = access_token + break + elif response.status_code == 400 and response.json().get("error") == "authorization_pending": + print("Authorization pending, retrying...") + else: + print(f"Error in token request: {response.status_code} - {response.text}") + break + + time.sleep(self.interval) diff --git a/modules/agent-framework/airavata-agent/scripts/agent.sh b/modules/agent-framework/airavata-agent/scripts/agent.sh index f3dc37ea64..0e3dbc70b6 100644 --- a/modules/agent-framework/airavata-agent/scripts/agent.sh +++ b/modules/agent-framework/airavata-agent/scripts/agent.sh @@ -1,17 +1,41 @@ -#!/bin/sh -x +#!/bin/bash -x -while getopts i:d: option - do +# ##################################################################### +# Standalone Airavata Agent +# ##################################################################### +# +# ---------------------------------------------------------------------- +# CONTRIBUTORS +# ---------------------------------------------------------------------- +# * Dimuthu Wannipurage +# * Lahiru Jayathilake +# * Yasith Jayawardana +# ###################################################################### + +#----------------------------------------------------------------------- +# STEP 1 - PARSE COMMAND LINE ARGS +#----------------------------------------------------------------------- +while getopts a:s:p: option; do case $option in - i ) AgentId=$OPTARG ;; - d ) ServerUrl=$OPTARG ;; - \? ) cat << ENDCAT1 ->! Usage: $0 [-i Agent ID ] !< ->! [-d Server URL ] !< -ENDCAT1 -# exit 1 ;; + a) AGENT_ID=$OPTARG ;; + s) SERVER_URL=$OPTARG ;; + p) PROCESS_ID=$OPTARG ;; + \?) cat <<ENDCAT ;; +>! Usage: $0 [-a AGENT_ID ] !< +>! [-s SERVER_URL] !< +>! [-p PROCESS_ID] !< +ENDCAT esac done -module load singularitypro -singularity exec /expanse/lustre/scratch/gridchem/temp_project/containers/airavata-agent.sif /opt/airavata-agent $ServerUrl:19900 $AgentId \ No newline at end of file +echo "AGENT_ID=$AGENT_ID" +echo "SERVER_URL=$SERVER_URL" +echo "PROCESS_ID=$PROCESS_ID" + +# ---------------------------------------------------------------------- +# STEP 2 - RUN AGENT +# ---------------------------------------------------------------------- +SCRATCH_DIR="/home/x-scigap/scratch/cs_workdirs" +SIF_PATH="/home/x-scigap/agent-framework/container/airavata-agent.sif" +singularity exec --bind $SCRATCH_DIR:/scratch $SIF_PATH \ + /opt/airavata-agent "$SERVER_URL":19900 "$AGENT_ID" \ No newline at end of file
