This is an automated email from the ASF dual-hosted git repository.

lahirujayathilake pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 397f1cdaf65ec8a1416cca96281d699fd1ca3163
Author: yasith <[email protected]>
AuthorDate: Tue Feb 18 03:18:07 2025 -0600

    add file ops to jupyter magics, cleanup code, update agent.sh
---
 .../airavata-agent/jupyter/Dockerfile              |   5 +-
 .../jupyter/labconfig/airavata_magics.py           | 230 +++++++++++++++------
 .../jupyter/labconfig/device_auth.py               |  59 ++++++
 .../airavata-agent/scripts/agent.sh                |  48 +++--
 4 files changed, 261 insertions(+), 81 deletions(-)

diff --git a/modules/agent-framework/airavata-agent/jupyter/Dockerfile 
b/modules/agent-framework/airavata-agent/jupyter/Dockerfile
index e0a42a7240..66f92f50df 100644
--- a/modules/agent-framework/airavata-agent/jupyter/Dockerfile
+++ b/modules/agent-framework/airavata-agent/jupyter/Dockerfile
@@ -1,4 +1,4 @@
-FROM python:slim
+FROM python:3.12-slim
 
 RUN apt update
 RUN apt install fuse -y
@@ -15,9 +15,10 @@ RUN git clone 
https://github.com/cyber-shuttle/jupyter-notebook-examples /home/j
 COPY labconfig/jupyter_lab_config.py /jupyter_lab_config.py
 COPY labconfig/airavata_magics.py /airavata_magics.py
 COPY labconfig/__init__.py /__init__.py
+COPY labconfig/device_auth.py /device_auth.py
 COPY labconfig/bootstrap.sh /bootstrap.sh
 RUN chmod +x /bootstrap.sh
-# COPY fuse/client /client
+
 EXPOSE 8888
 WORKDIR /home
 
diff --git 
a/modules/agent-framework/airavata-agent/jupyter/labconfig/airavata_magics.py 
b/modules/agent-framework/airavata-agent/jupyter/labconfig/airavata_magics.py
index d2b34d5d6a..6d0708f6c5 100644
--- 
a/modules/agent-framework/airavata-agent/jupyter/labconfig/airavata_magics.py
+++ 
b/modules/agent-framework/airavata-agent/jupyter/labconfig/airavata_magics.py
@@ -1,53 +1,76 @@
-from IPython.core.magic import register_cell_magic
-from IPython.core.magic import register_line_magic
-
-from IPython.display import display, Image, HTML
 import base64
-import requests
+import binascii
 import json
-import jwt
 import time
 from pathlib import Path
+from typing import NamedTuple
+import jwt
 import os
+import requests
+from IPython.core.magic import register_cell_magic, register_line_magic
+from IPython.display import HTML, Image, display
+from device_auth import DeviceFlowAuthenticator
+
+
+AgentInfo = NamedTuple('AgentInfo', [
+    ('agentId', str),
+    ('experimentId', str),
+    ('processId', str),
+    ('cluster', str),
+    ('queue', str),
+    ('cpus', int),
+    ('memory', int),
+    ('walltime', int),
+    ('gateway_id', str),
+    ('group', str),
+])
+api_base_url = "https://api.gateway.cybershuttle.org";
+file_server_url = "http://3.142.234.94:8050";
+current_agent : AgentInfo | None = None
+MSG_NOT_INITIALIZED = r"Remote agent not initialized. Please run %init_remote 
cluster=<cluster> cpu=<cpu> memory=<memory mb> queue=<queue> walltime=<walltime 
minutes> group=<group>"
+
+
+def get_access_token() -> str | None:
+  token_from_env = os.getenv('CS_ACCESS_TOKEN')
+  if token_from_env:
+      return token_from_env
+  EXPLICIT_TOKEN_FILE = Path("~").expanduser() / "csagent" / "token" / 
"keys.json"
+  if EXPLICIT_TOKEN_FILE.exists():
+    with open(EXPLICIT_TOKEN_FILE, "r") as f:
+      return json.load(f).get("access_token")
+
+
+def get_agent_status() -> dict | None:
+  if not current_agent:
+    return print(MSG_NOT_INITIALIZED)
+  url = f"{api_base_url}/api/v1/agent/{current_agent.agentId}"
+  response = requests.get(url)
+  if response.status_code == 202:
+    return response.json()
+  return print(f"Got [{response.status_code}] Response: {response.text}")
+
+
+def get_process_id(experiment_id: str, headers) -> str:
+    """
+    Get process id by experiment id
+
+    """
+    url = f"{api_base_url}/api/v1/exp/{experiment_id}/process"
+    process_id = ""
+    while not process_id:
+        response = requests.get(url, headers=headers)
+        if response.status_code == 200:
+            process_id = response.json().get("processId")
+        else:
+            time.sleep(5)
+    return process_id
 
-current_agent_info = None
-
-EXPLICIT_TOKEN_FILE = (
-        Path(os.path.expanduser("~")) / "csagent" / "token" / "keys.json"
-)
-
-def read_explicit_token_file():
-    if not EXPLICIT_TOKEN_FILE.exists():
-        return None
-    else:
-        with open(EXPLICIT_TOKEN_FILE, "r") as f:
-            return json.load(f)
-
-def get_access_token():
-    expl_token_data = read_explicit_token_file()
-    if expl_token_data:
-        return expl_token_data["access_token"]
-
-def get_agent_status():
-    global current_agent_info
 
-    if not current_agent_info:
-        print("No agent was scheduled yet. Please run %init_remote 
cluster=<cluster> cpu=<cpu> memory=<memory mb> queue=<queue> walltime=<walltime 
minutes>")
-        return
+def submit_agent_job(experiment_name, cluster, queue, cpus, memory, walltime, 
access_token, gateway_id='default', group='Default'):
+    global current_agent
 
-    url = 'https://api.gateway.cybershuttle.org/api/v1/agent/' + 
current_agent_info['agentId']
-    response = requests.get(url)
-    if response.status_code == 202:
-        return response.json()
-    else:
-        print('Invalid response reveived. Status code:', response.status_code)
-        print('Response:', response.text)
-
-def submit_agent_job(experiment_name, cluster, queue, cpus, memory, wallTime, 
access_token, gateway_id='testdrive'):
-
-    global current_agent_info
     # URL to which the POST request will be sent
-    url = 'https://api.gateway.cybershuttle.org/api/v1/exp/launch'
+    url = api_base_url + '/api/v1/exp/launch'
 
     # Data to be sent in the POST request
     data = {
@@ -56,7 +79,7 @@ def submit_agent_job(experiment_name, cluster, queue, cpus, 
memory, wallTime, ac
         'cpuCount': cpus,
         'nodeCount': 1,
         'memory': memory,
-        'wallTime': wallTime,
+        'wallTime': walltime,
         'queue': queue
     }
 
@@ -83,19 +106,32 @@ def submit_agent_job(experiment_name, cluster, queue, 
cpus, memory, wallTime, ac
     # Check if the request was successful
     if response.status_code == 200:
         # Parse the JSON response
-        response_data = response.json()
-        print('Response:', response_data)
-        current_agent_info = response_data
+        obj = response.json()
+        current_agent = AgentInfo(
+            agentId=obj['agentId'],
+            experimentId=obj['experimentId'],
+            processId=get_process_id(obj['experimentId'], headers=headers),
+            cluster=cluster,
+            queue=queue,
+            cpus=cpus,
+            memory=memory,
+            walltime=walltime,
+            gateway_id=gateway_id,
+            group=group,
+        )
+        print('Agent Initialized:', current_agent)
     else:
         print('Failed to send POST request. Status code:', 
response.status_code)
         print('Response:', response.text)
 
-def terminate_agent(access_token, gateway_id='testdrive'):
 
-    global current_agent_info
+def terminate_agent(access_token, gateway_id='default'):
+    global current_agent
+    if not current_agent:
+      return print(MSG_NOT_INITIALIZED)
 
-    expId = current_agent_info['experimentId']
-    url = 'https://api.gateway.cybershuttle.org/api/v1/exp/terminate/' + expId
+    expId = current_agent.experimentId
+    url = api_base_url + '/api/v1/exp/terminate/' + expId
 
     decode = jwt.decode(access_token, options={"verify_signature": False})
     user_id = decode['preferred_username']
@@ -118,7 +154,7 @@ def terminate_agent(access_token, gateway_id='testdrive'):
         # Parse the JSON response
         response_data = response.json()
         print('Agent terminated:', response_data)
-        current_agent_info = None
+        current_agent = None
     else:
         print('Failed to send termination request. Status code:', 
response.status_code)
         print('Response:', response.text)
@@ -126,19 +162,17 @@ def terminate_agent(access_token, gateway_id='testdrive'):
 
 @register_cell_magic
 def run_remote(line, cell):
+    global current_agent
+    if not current_agent:
+      return print(MSG_NOT_INITIALIZED)
 
-    global current_agent_info
-    if not current_agent_info:
-        print("No agent was scheduled yet. Please run %init_remote 
cluster=<cluster> cpu=<cpu> memory=<memory mb> queue=<queue> walltime=<walltime 
minutes>")
-        return
-
-    url = 
'https://api.gateway.cybershuttle.org/api/v1/agent/executejupyterrequest'
+    url = api_base_url + '/api/v1/agent/executejupyterrequest'
 
     data = {
         "sessionId": "session1",
         "keepAlive": True,
         "code": cell,
-        "agentId": current_agent_info["agentId"]
+        "agentId": current_agent.agentId
     }
 
     json_data = json.dumps(data)
@@ -150,7 +184,7 @@ def run_remote(line, cell):
         print("Cell execution failed. Error: " + error)
     if execution_id:
         while True:
-            url = 
"https://api.gateway.cybershuttle.org/api/v1/agent/executejupyterresponse/"; + 
execution_id
+            url = api_base_url + "/api/v1/agent/executejupyterresponse/" + 
execution_id
             response = requests.get(url, headers={'Accept': 
'application/json'})
             json_response = response.json()
             if json_response.get('available'):
@@ -171,7 +205,7 @@ def run_remote(line, cell):
                                 try:
                                     image_bytes = base64.b64decode(image_data)
                                     display(Image(data=image_bytes, 
format='png'))
-                                except base64.binascii.Error as e:
+                                except binascii.Error as e:
                                     print(f"Failed to decode image data: {e}")
                             # Ignoring any texts in the display data
                             # if 'text/plain' in data_obj:
@@ -186,7 +220,7 @@ def run_remote(line, cell):
                                     color: #a71d5d;
                                     background-color: #fdd;
                                     border: 1px solid #a71d5d;
-                                    padding: 10px;
+                                    padding: 5px;
                                     border-radius: 5px;
                                     font-family: Consolas, 'Courier New', 
monospace;
                                     white-space: pre-wrap;
@@ -208,7 +242,7 @@ def run_remote(line, cell):
                                 color: #a71d5d;
                                 background-color: #fdd;
                                 border: 1px solid #a71d5d;
-                                padding: 10px;
+                                padding: 5px;
                                 border-radius: 5px;
                                 font-family: Consolas, 'Courier New', 
monospace;
                             ">
@@ -237,15 +271,24 @@ def run_remote(line, cell):
                             try:
                                 image_bytes = base64.b64decode(image_data)
                                 display(Image(data=image_bytes, format='png'))
-                            except base64.binascii.Error as e:
+                            except binascii.Error as e:
                                 print(f"Failed to decode image data: {e}")
                 break
             time.sleep(1)
 
+
 @register_line_magic
-def init_remote(line):
+def cs_login(line):
+    try:
+        authenticator = DeviceFlowAuthenticator()
+        authenticator.login()
+    except ValueError as e:
+        print(f"Configuration error: {e}")
+
 
-    if current_agent_info:
+@register_line_magic
+def init_remote(line):
+    if current_agent:
         status = get_agent_status()
         if status:
             if status['agentUp']:
@@ -280,6 +323,7 @@ def init_remote(line):
 
     submit_agent_job('CS_Agent', cluster_value, queue_value, cpu_value, 
memory_value, walltime_value, access_token)
 
+
 @register_line_magic
 def status_remote(line):
     status = get_agent_status()
@@ -287,18 +331,70 @@ def status_remote(line):
         if status['agentUp']:
             print("Agent", status['agentId'], 'is running')
         else:
-            print("Agent", status['agentId'], 'is still prepairing. Please 
wait')
+            print("Agent", status['agentId'], 'is still preparing. Please 
wait')
+
 
 @register_line_magic
 def terminate_remote(line):
-    global current_agent_info
+    global current_agent
     access_token = get_access_token()
-    if current_agent_info:
+    if current_agent:
         terminate_agent(access_token)
 
 
+@register_line_magic
+def push_remote(line):
+    if not current_agent:
+      return print(MSG_NOT_INITIALIZED)
+    pairs = line.split()
+    remot_path = None
+    local_path = None
+    for pair in pairs:
+        if pair.startswith("source="):
+            local_path = pair.split("=")[1]
+        if pair.startswith("target="):
+            remot_path = pair.split("=")[1]
+    # validate paths
+    if not remot_path or not local_path:
+      return print("Please provide paths for both source and target")
+    # upload file
+    print(f"Pushing local:{local_path} to remote:{remot_path}")
+    url = 
f"{file_server_url}/upload/live/{current_agent.processId}/{remot_path}"
+    with open(local_path, "rb") as file:
+      files = {"file": file}
+      response = requests.post(url, files=files)
+    print(f"[{response.status_code}] Uploaded local:{local_path} to 
remote:{remot_path}")
+
+
+@register_line_magic
+def pull_remote(line):
+    if not current_agent:
+      return print(MSG_NOT_INITIALIZED)
+    pairs = line.split()
+    remot_path = None
+    local_path = None
+    for pair in pairs:
+        if pair.startswith("source="):
+            remot_path = pair.split("=")[1]
+        if pair.startswith("target="):
+            local_path = pair.split("=")[1]
+    # validate paths
+    if not remot_path or not local_path:
+      return print("Please provide paths for both source and target")
+    # download file
+    print(f"Pulling remote:{remot_path} to local:{local_path}")
+    url = 
f"{file_server_url}/download/live/{current_agent.processId}/{remot_path}"
+    response = requests.get(url)
+    with open(local_path, "wb") as file:
+      file.write(response.content)
+    print(f"[{response.status_code}] Downloaded remote:{remot_path} to 
local:{local_path}")
+
+
 def load_ipython_extension(ipython):
+    ipython.register_magic_function(cs_login)
     ipython.register_magic_function(init_remote)
     ipython.register_magic_function(status_remote)
     ipython.register_magic_function(terminate_remote)
     ipython.register_magic_function(run_remote)
+    ipython.register_magic_function(push_remote)
+    ipython.register_magic_function(pull_remote)
diff --git 
a/modules/agent-framework/airavata-agent/jupyter/labconfig/device_auth.py 
b/modules/agent-framework/airavata-agent/jupyter/labconfig/device_auth.py
new file mode 100644
index 0000000000..58f325b387
--- /dev/null
+++ b/modules/agent-framework/airavata-agent/jupyter/labconfig/device_auth.py
@@ -0,0 +1,59 @@
+import requests
+import time
+import os
+# Load environment variables from .env file
+
+class DeviceFlowAuthenticator:
+    def __init__(self):
+        self.client_id = "cybershuttle-agent"
+        self.realm = "default"
+        self.auth_server_url = "https://auth.cybershuttle.org";
+
+        if not self.client_id or not self.realm or not self.auth_server_url:
+            raise ValueError("Missing required environment variables for 
client ID, realm, or auth server URL")
+
+        self.device_code = None
+        self.interval = None
+
+    def login(self):
+        # Step 1: Request device and user code
+        auth_device_url = 
f"{self.auth_server_url}/realms/{self.realm}/protocol/openid-connect/auth/device"
+        response = requests.post(auth_device_url, data={"client_id": 
self.client_id, "scope": "openid"})
+
+        if response.status_code != 200:
+            print(f"Error in device authorization request: 
{response.status_code} - {response.text}")
+            return
+
+        data = response.json()
+        self.device_code = data.get("device_code")
+        self.interval = data.get("interval", 5)
+
+        print(f"User code: {data.get('user_code')}")
+        print(f"Please authenticate by visiting: 
{data.get('verification_uri_complete')}")
+
+        # Step 2: Poll for the token
+        self.poll_for_token()
+
+    def poll_for_token(self):
+        assert self.interval is not None
+        token_url = 
f"{self.auth_server_url}/realms/{self.realm}/protocol/openid-connect/token"
+        while True:
+            response = requests.post(token_url, data={
+                "client_id": self.client_id,
+                "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
+                "device_code": self.device_code
+            })
+
+            if response.status_code == 200:
+                data = response.json()
+                access_token = data.get("access_token")
+                print(f"Received access token")
+                os.environ['CS_ACCESS_TOKEN'] = access_token
+                break
+            elif response.status_code == 400 and response.json().get("error") 
== "authorization_pending":
+                print("Authorization pending, retrying...")
+            else:
+                print(f"Error in token request: {response.status_code} - 
{response.text}")
+                break
+
+            time.sleep(self.interval)
diff --git a/modules/agent-framework/airavata-agent/scripts/agent.sh 
b/modules/agent-framework/airavata-agent/scripts/agent.sh
index f3dc37ea64..0e3dbc70b6 100644
--- a/modules/agent-framework/airavata-agent/scripts/agent.sh
+++ b/modules/agent-framework/airavata-agent/scripts/agent.sh
@@ -1,17 +1,41 @@
-#!/bin/sh -x
+#!/bin/bash -x
 
-while getopts i:d: option
- do
+# #####################################################################
+# Standalone Airavata Agent
+# #####################################################################
+#
+# ----------------------------------------------------------------------
+# CONTRIBUTORS
+# ----------------------------------------------------------------------
+# * Dimuthu Wannipurage
+# * Lahiru Jayathilake
+# * Yasith Jayawardana
+# ######################################################################
+
+#-----------------------------------------------------------------------
+# STEP 1 - PARSE COMMAND LINE ARGS
+#-----------------------------------------------------------------------
+while getopts a:s:p: option; do
   case $option in
-    i ) AgentId=$OPTARG ;;
-    d ) ServerUrl=$OPTARG ;;
-     \? ) cat << ENDCAT1
->! Usage: $0  [-i Agent ID ]    !<
->!            [-d  Server URL ]      !<
-ENDCAT1
-#   exit 1 ;;
+  a) AGENT_ID=$OPTARG ;;
+  s) SERVER_URL=$OPTARG ;;
+  p) PROCESS_ID=$OPTARG ;;
+  \?) cat <<ENDCAT ;;
+>! Usage: $0  [-a AGENT_ID ]    !<
+>!            [-s SERVER_URL]   !<
+>!            [-p PROCESS_ID]   !<
+ENDCAT
   esac
 done
 
-module load singularitypro
-singularity exec 
/expanse/lustre/scratch/gridchem/temp_project/containers/airavata-agent.sif 
/opt/airavata-agent $ServerUrl:19900 $AgentId
\ No newline at end of file
+echo "AGENT_ID=$AGENT_ID"
+echo "SERVER_URL=$SERVER_URL"
+echo "PROCESS_ID=$PROCESS_ID"
+
+# ----------------------------------------------------------------------
+# STEP 2 - RUN AGENT
+# ----------------------------------------------------------------------
+SCRATCH_DIR="/home/x-scigap/scratch/cs_workdirs"
+SIF_PATH="/home/x-scigap/agent-framework/container/airavata-agent.sif"
+singularity exec --bind $SCRATCH_DIR:/scratch $SIF_PATH \
+  /opt/airavata-agent "$SERVER_URL":19900 "$AGENT_ID"
\ No newline at end of file

Reply via email to