This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch cybershuttle-staging
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 9c38135d6cc19f1f27ff53b54a9feca3b8d85b41
Author: yasith <[email protected]>
AuthorDate: Thu Apr 3 04:29:52 2025 -0400

    refine agent.go, fix bugs
---
 .../airavata_experiments/airavata.py               | 106 ++++++++++++---------
 .../airavata_jupyter_magic/__init__.py             |   5 +-
 .../service/controllers/AgentController.java       |   6 +-
 .../service/controllers/ExperimentController.java  |  23 +++--
 modules/agent-framework/airavata-agent/README.md   |   2 +-
 modules/agent-framework/airavata-agent/agent.go    |  52 ++++++----
 6 files changed, 111 insertions(+), 83 deletions(-)

diff --git 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py
 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py
index 90b2ee2fd9..c110837cf6 100644
--- 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py
+++ 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py
@@ -346,8 +346,9 @@ class AiravataOperator:
       fp = os.path.join("/data", file.name)
       rawdata = file.read_bytes()
       b64data = base64.b64encode(rawdata).decode()
-      res = 
requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={
+      res = requests.post(f"{self.connection_svc_url()}/agent/execute/shell", 
json={
           "agentId": agent_ref,
+          "envName": "base",
           "workingDir": ".",
           "arguments": ["sh", "-c", f"echo {b64data} | base64 -d > {fp}"]
       })
@@ -363,7 +364,7 @@ class AiravataOperator:
       else:
         exc_id = data["executionId"]
         while True:
-          res = 
requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}")
+          res = 
requests.get(f"{self.connection_svc_url()}/agent/execute/shell/{exc_id}")
           data = res.json()
           if data["available"]:
             return [fp]
@@ -387,8 +388,9 @@ class AiravataOperator:
     Return Path: /{project_name}/{experiment_name}
 
     """
-    res = 
requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={
+    res = requests.post(f"{self.connection_svc_url()}/agent/execute/shell", 
json={
         "agentId": agent_ref,
+        "envName": "base",
         "workingDir": ".",
         "arguments": ["sh", "-c", r"find /data -type d -name 'venv' -prune -o 
-type f -printf '%P\n' | sort"]
     })
@@ -403,7 +405,7 @@ class AiravataOperator:
     else:
       exc_id = data["executionId"]
       while True:
-        res = 
requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}")
+        res = 
requests.get(f"{self.connection_svc_url()}/agent/execute/shell/{exc_id}")
         data = res.json()
         if data["available"]:
           files = data["responseString"].split("\n")
@@ -424,8 +426,9 @@ class AiravataOperator:
     """
     import os
     fp = os.path.join("/data", remote_file)
-    res = 
requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={
+    res = requests.post(f"{self.connection_svc_url()}/agent/execute/shell", 
json={
         "agentId": agent_ref,
+        "envName": "base",
         "workingDir": ".",
         "arguments": ["sh", "-c", f"cat {fp} | base64 -w0"]
     })
@@ -442,7 +445,7 @@ class AiravataOperator:
     else:
       exc_id = data["executionId"]
       while True:
-        res = 
requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}")
+        res = 
requests.get(f"{self.connection_svc_url()}/agent/execute/shell/{exc_id}")
         data = res.json()
         if data["available"]:
           content = data["responseString"]
@@ -469,8 +472,9 @@ class AiravataOperator:
     """
     import os
     fp = os.path.join("/data", remote_file)
-    res = 
requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={
+    res = requests.post(f"{self.connection_svc_url()}/agent/execute/shell", 
json={
         "agentId": agent_ref,
+        "envName": "base",
         "workingDir": ".",
         "arguments": ["sh", "-c", f"cat {fp} | base64 -w0"]
     })
@@ -487,7 +491,7 @@ class AiravataOperator:
     else:
       exc_id = data["executionId"]
       while True:
-        res = 
requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}")
+        res = 
requests.get(f"{self.connection_svc_url()}/agent/execute/shell/{exc_id}")
         data = res.json()
         if data["available"]:
           content = data["responseString"]
@@ -711,20 +715,12 @@ class AiravataOperator:
   
   def execute_py(self, project: str, libraries: list[str], code: str, 
agent_id: str, pid: str, runtime_args: dict, cold_start: bool = True) -> str | 
None:
     # lambda to send request
-    print(f"[av] Attempting to submit to agent {agent_id}...")
-    make_request = lambda: 
requests.post(f"{self.connection_svc_url()}/agent/executepythonrequest", json={
-      "libraries": libraries,
-      "code": code,
-      "pythonVersion": "3.10", # TODO verify
-      "keepAlive": False, # TODO verify
-      "parentExperimentId": "/data", # the working directory
-      "agentId": agent_id,
-    })
     try:
       if cold_start:
-        res = make_request()
+        print(f"[av] Looking for Agent {agent_id}...")
+        res = requests.get(f"{self.connection_svc_url()}/{agent_id}")
         data = res.json()
-        if data["error"] == "Agent not found":
+        if data["agentUp"] == False:
           # waiting for agent to be available
           print(f"[av] Agent {agent_id} not found! Relaunching...")
           self.launch_experiment(
@@ -743,39 +739,55 @@ class AiravataOperator:
             walltime=runtime_args["walltime"],
             group=runtime_args["group"],
           )
-          return self.execute_py(project, libraries, code, agent_id, pid, 
runtime_args, cold_start=False)
-        elif data["executionId"] is not None:
-          print(f"[av] Submitted to Python Interpreter")
-          # agent response
-          exc_id = data["executionId"]
-        else:
-          # unrecoverable error
-          raise Exception(data["error"])
-      else:
-        # poll until agent is available
-        while True:
-          res = make_request()
-          data = res.json()
-          if data["error"] == "Agent not found":
-            # print(f"[av] Waiting for Agent {agent_id}...")
-            time.sleep(2)
-            continue
-          elif data["executionId"] is not None:
-            print(f"[av] Submitted to Python Interpreter")
-            exc_id = data["executionId"]
-            break
-          else:
-            raise Exception(data["error"])
-      assert exc_id is not None, f"Invalid execution id: {exc_id}"
+        return self.execute_py(project, libraries, code, agent_id, pid, 
runtime_args, cold_start=False)
+
+      print(f"[av] Waiting for Agent {agent_id}...")
+      while True: # poll for response
+        res = requests.get(f"{self.connection_svc_url()}/{agent_id}")
+        data = res.json()
+        if data["agentUp"]:
+          break
+        time.sleep(1)
       
-      # wait for the execution response to be available
-      while True:
-        res = 
requests.get(f"{self.connection_svc_url()}/agent/executepythonresponse/{exc_id}")
+      print(f"[av] Agent {agent_id} found! creating environment...")
+      res = requests.post(f"{self.connection_svc_url()}/setup/env", json={
+        "agentId": agent_id,
+        "envName": "base",
+        "libraries": ["python=3.10", "pip"],
+        "pip": libraries,
+      })
+      data = res.json()
+      if {exc_id := data["executionId"]} is None:
+        raise Exception(data["error"])
+      while True: # poll for response
+        res = 
requests.get(f"{self.connection_svc_url()}/agent/setup/env/{exc_id}")
+        data = res.json()
+        if data["available"]:
+          response = str(data["responseString"])
+          break
+        time.sleep(1)
+
+      print(f"[av] Agent {agent_id} env created! executing code...")
+      res = requests.post(f"{self.connection_svc_url()}/agent/execute/python", 
json={
+        "agentId": agent_id,
+        "envName": "base",
+        "workingDir": ".",
+        "code": code,
+      })
+      data = res.json()
+      if {exc_id := data["executionId"]} is None:
+        raise Exception(data["error"])
+      while True: # poll for response
+        res = 
requests.get(f"{self.connection_svc_url()}/agent/execute/python/{exc_id}")
         data = res.json()
         if data["available"]:
           response = str(data["responseString"])
-          return response
+          break
         time.sleep(1)
+      
+      print(f"[av] Agent {agent_id} code executed! response: {response}")
+      return response
+    
     except Exception as e:
       print(f"[av] Remote execution failed! {e}")
       return None
diff --git 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_jupyter_magic/__init__.py
 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_jupyter_magic/__init__.py
index 44e425fa36..90cdb8b8ee 100644
--- 
a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_jupyter_magic/__init__.py
+++ 
b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_jupyter_magic/__init__.py
@@ -388,10 +388,9 @@ def run_on_runtime(rt_name: str, cell: str, 
store_history=False, silent=False, s
 
     url = api_base_url + '/api/v1/agent/execute/jupyter'
     data = {
-        "sessionId": "session1",
-        "keepAlive": True,
+        "agentId": rt.agentId,
+        "envName": "base",
         "code": cell,
-        "agentId": rt.agentId
     }
     json_data = json.dumps(data)
     response = requests.post(
diff --git 
a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java
 
b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java
index dedd735293..8cdc790b48 100644
--- 
a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java
+++ 
b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java
@@ -44,12 +44,12 @@ public class AgentController {
         return 
ResponseEntity.accepted().body(agentConnectionHandler.isAgentUp(agentId));
     }
 
-    @PostMapping("/create/tunnel")
+    @PostMapping("/setup/tunnel")
     public ResponseEntity<AgentTunnelAck> runTunnelCreationOnAgent(@Valid 
@RequestBody AgentTunnelRequest tunnelRequest) {
         return 
ResponseEntity.accepted().body(agentConnectionHandler.runTunnelOnAgent(tunnelRequest));
     }
 
-    @PostMapping("/create/env")
+    @PostMapping("/setup/env")
     public ResponseEntity<AgentEnvSetupAck> runEnvSetupOnAgent(@Valid 
@RequestBody AgentEnvSetupRequest envSetupRequest) {
         logger.info("Received env setup request to run on agent {}", 
envSetupRequest.getAgentId());
         if 
(agentConnectionHandler.isAgentUp(envSetupRequest.getAgentId()).isAgentUp()) {
@@ -62,7 +62,7 @@ public class AgentController {
         }
     }
 
-    @GetMapping("/create/env/{executionId}")
+    @GetMapping("/setup/env/{executionId}")
     public ResponseEntity<AgentEnvSetupResponse> 
getEnvSetupResponse(@PathVariable("executionId") String executionId) {
         return 
ResponseEntity.accepted().body(agentConnectionHandler.getEnvSetupResponse(executionId));
     }
diff --git 
a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/ExperimentController.java
 
b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/ExperimentController.java
index 351f3caf2d..c1cb78812c 100644
--- 
a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/ExperimentController.java
+++ 
b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/ExperimentController.java
@@ -1,16 +1,21 @@
 package org.apache.airavata.agent.connection.service.controllers;
 
+import javax.validation.Valid;
+
 import 
org.apache.airavata.agent.connection.service.handlers.AgentManagementHandler;
-import org.apache.airavata.agent.connection.service.models.LaunchAgentRequest;
-import org.apache.airavata.agent.connection.service.models.LaunchAgentResponse;
-import 
org.apache.airavata.agent.connection.service.models.TerminateAgentResponse;
+import org.apache.airavata.agent.connection.service.models.AgentLaunchRequest;
+import org.apache.airavata.agent.connection.service.models.AgentLaunchResponse;
+import 
org.apache.airavata.agent.connection.service.models.AgentTerminateResponse;
 import org.apache.airavata.model.experiment.ExperimentModel;
 import org.apache.airavata.model.process.ProcessModel;
 import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.*;
-
-import javax.validation.Valid;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
 
 @RestController
 @RequestMapping("/api/v1/exp")
@@ -28,13 +33,13 @@ public class ExperimentController {
     }
 
     @PostMapping(value = "/launch", consumes = 
MediaType.APPLICATION_JSON_VALUE)
-    public ResponseEntity<LaunchAgentResponse> 
createAndLaunchExperiment(@Valid @RequestBody LaunchAgentRequest request) {
-        LaunchAgentResponse agentResponse = 
agentManagementHandler.createAndLaunchExperiment(request);
+    public ResponseEntity<AgentLaunchResponse> 
createAndLaunchExperiment(@Valid @RequestBody AgentLaunchRequest request) {
+        AgentLaunchResponse agentResponse = 
agentManagementHandler.createAndLaunchExperiment(request);
         return ResponseEntity.ok(agentResponse);
     }
 
     @GetMapping(value = "/terminate/{expId}", produces = 
MediaType.APPLICATION_JSON_VALUE)
-    public ResponseEntity<TerminateAgentResponse> 
terminateExperiment(@PathVariable("expId") String expId) {
+    public ResponseEntity<AgentTerminateResponse> 
terminateExperiment(@PathVariable("expId") String expId) {
         return 
ResponseEntity.ok(agentManagementHandler.terminateExperiment(expId));
     }
 
diff --git a/modules/agent-framework/airavata-agent/README.md 
b/modules/agent-framework/airavata-agent/README.md
index 2868723b54..3e83d71623 100644
--- a/modules/agent-framework/airavata-agent/README.md
+++ b/modules/agent-framework/airavata-agent/README.md
@@ -76,7 +76,7 @@ Response
 ```
 
 ```
-POST http://localhost:18880/api/v1/agent/create/tunnel
+POST http://localhost:18880/api/v1/agent/setup/tunnel
 
 {
     "destinationHost": "32.241.33.22",
diff --git a/modules/agent-framework/airavata-agent/agent.go 
b/modules/agent-framework/airavata-agent/agent.go
index a89c30f258..3b8000aea9 100644
--- a/modules/agent-framework/airavata-agent/agent.go
+++ b/modules/agent-framework/airavata-agent/agent.go
@@ -96,20 +96,22 @@ func startInterceptor(stream Stream, grpcStreamChannel chan 
struct{}) {
                        log.Printf("[agent.go] Recived a python execution 
request\n")
                        executionId := x.PythonExecutionRequest.ExecutionId
                        envName = x.PythonExecutionRequest.EnvName
+                       workingDir := x.PythonExecutionRequest.WorkingDir
                        code := x.PythonExecutionRequest.Code
-                       workDir := x.PythonExecutionRequest.WorkingDir
-                       go executePython(stream, executionId, envName, workDir, 
code)
+                       go executePython(stream, executionId, envName, 
workingDir, code)
 
                case *protos.ServerMessage_CommandExecutionRequest:
                        log.Printf("[agent.go] Recived a shell execution 
request\n")
                        executionId := x.CommandExecutionRequest.ExecutionId
+                       envName = x.CommandExecutionRequest.EnvName
+                       workingDir := x.CommandExecutionRequest.WorkingDir
                        execArgs := x.CommandExecutionRequest.Arguments
-                       go executeShell(stream, executionId, envName, execArgs)
+                       go executeShell(stream, executionId, envName, 
workingDir, execArgs)
 
                case *protos.ServerMessage_JupyterExecutionRequest:
                        log.Printf("[agent.go] Recived a jupyter execution 
request\n")
                        executionId := x.JupyterExecutionRequest.ExecutionId
-                       envName := x.JupyterExecutionRequest.EnvName
+                       envName = x.JupyterExecutionRequest.EnvName
                        code := x.JupyterExecutionRequest.Code
                        go executeJupyter(stream, executionId, envName, code)
 
@@ -132,21 +134,28 @@ func createEnv(stream Stream, executionId string, envName 
string, envLibs []stri
        log.Printf("[agent.go] createEnv() Env libs %s\n", envLibs)
        log.Printf("[agent.go] createEnv() Env pip %s\n", envPip)
        // create environment
-       createEnvCmd := exec.Command("micromamba", "create", "-n", envName, 
"--yes", "--quiet")
-       if err := createEnvCmd.Wait(); err != nil {
-               log.Printf("[agent.go] createEnv() Error creating environment: 
%v\n", err)
-               return
+       if envName != "base" {
+               createEnvCmd := exec.Command("micromamba", "create", "-n", 
envName, "--yes", "--quiet")
+               if err := createEnvCmd.Wait(); err != nil {
+                       log.Printf("[agent.go] createEnv() Error creating 
environment: %v\n", err)
+                       return
+               }
+               log.Printf("[agent.go] createEnv() Environment created: %s\n", 
envName)
        }
-       log.Printf("[agent.go] createEnv() Environment created: %s\n", envName)
-       installDepsCmd := exec.Command("micromamba", "install", "-n", envName, 
"--yes", "--quiet", strings.Join(envLibs, " "))
-       if err := installDepsCmd.Wait(); err != nil {
-               log.Printf("[agent.go] createEnv() Error waiting for command: 
%v\n", err)
-               return
+
+       if len(envLibs) > 0 {
+               installDepsCmd := exec.Command("micromamba", "install", "-n", 
envName, "--yes", "--quiet", strings.Join(envLibs, " "))
+               if err := installDepsCmd.Wait(); err != nil {
+                       log.Printf("[agent.go] createEnv() Error waiting for 
command: %v\n", err)
+                       return
+               }
        }
-       installPipCmd := exec.Command("micromamba", "run", "-n", envName, 
"pip", "install", strings.Join(envLibs, " "))
-       if err := installPipCmd.Wait(); err != nil {
-               log.Printf("[agent.go] createEnv() Error waiting for command: 
%v\n", err)
-               return
+       if len(envPip) > 0 {
+               installPipCmd := exec.Command("micromamba", "run", "-n", 
envName, "pip", "install", strings.Join(envLibs, " "))
+               if err := installPipCmd.Wait(); err != nil {
+                       log.Printf("[agent.go] createEnv() Error waiting for 
command: %v\n", err)
+                       return
+               }
        }
        // start python server
        go startPythonServer(envName)
@@ -209,8 +218,10 @@ func executePython(stream Stream, executionId string, 
envName string, workingDir
        log.Printf("[agent.go] executePython() Working Dir %s\n", workingDir)
        log.Printf("[agent.go] executePython() Code %s\n", code)
        // Run command
-       pythonCmd := exec.Command("micromamba", "run", "-n", envName, "python", 
"-c", code)
-       output, err := pythonCmd.CombinedOutput()
+       cmd := exec.Command("micromamba", "run", "-n", envName, "python", "-c")
+       cmd.Dir = workingDir
+       cmd.Stdin = strings.NewReader(code)
+       output, err := cmd.CombinedOutput()
        var responseString string
        if err != nil {
                responseString = fmt.Sprintf("Error: %v", err)
@@ -234,12 +245,13 @@ func executePython(stream Stream, executionId string, 
envName string, workingDir
        }
 }
 
-func executeShell(stream Stream, executionId string, envName string, execArgs 
[]string) {
+func executeShell(stream Stream, executionId string, envName string, 
workingDir string, execArgs []string) {
        log.Printf("[agent.go] executeShell() Execution id %s\n", executionId)
        log.Printf("[agent.go] executeShell() Env name %s\n", envName)
        log.Printf("[agent.go] executeShell() Exec args %s\n", execArgs)
        // Run command
        cmd := exec.Command("micromamba", "run", "-n", envName, 
strings.Join(execArgs, " "))
+       cmd.Dir = workingDir
        output, err := cmd.CombinedOutput()
        var responseString string
        if err != nil {

Reply via email to