This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch cybershuttle-staging in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 9c38135d6cc19f1f27ff53b54a9feca3b8d85b41 Author: yasith <[email protected]> AuthorDate: Thu Apr 3 04:29:52 2025 -0400 refine agent.go, fix bugs --- .../airavata_experiments/airavata.py | 106 ++++++++++++--------- .../airavata_jupyter_magic/__init__.py | 5 +- .../service/controllers/AgentController.java | 6 +- .../service/controllers/ExperimentController.java | 23 +++-- modules/agent-framework/airavata-agent/README.md | 2 +- modules/agent-framework/airavata-agent/agent.go | 52 ++++++---- 6 files changed, 111 insertions(+), 83 deletions(-) diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py index 90b2ee2fd9..c110837cf6 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py @@ -346,8 +346,9 @@ class AiravataOperator: fp = os.path.join("/data", file.name) rawdata = file.read_bytes() b64data = base64.b64encode(rawdata).decode() - res = requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={ + res = requests.post(f"{self.connection_svc_url()}/agent/execute/shell", json={ "agentId": agent_ref, + "envName": "base", "workingDir": ".", "arguments": ["sh", "-c", f"echo {b64data} | base64 -d > {fp}"] }) @@ -363,7 +364,7 @@ class AiravataOperator: else: exc_id = data["executionId"] while True: - res = requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}") + res = requests.get(f"{self.connection_svc_url()}/agent/execute/shell/{exc_id}") data = res.json() if data["available"]: return [fp] @@ -387,8 +388,9 @@ class AiravataOperator: Return Path: /{project_name}/{experiment_name} """ - res = requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={ + res = requests.post(f"{self.connection_svc_url()}/agent/execute/shell", json={ "agentId": agent_ref, + "envName": "base", "workingDir": ".", "arguments": ["sh", "-c", r"find /data -type d -name 'venv' -prune -o -type f -printf '%P\n' | sort"] }) @@ -403,7 +405,7 @@ class AiravataOperator: else: exc_id = data["executionId"] while True: - res = requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}") + res = requests.get(f"{self.connection_svc_url()}/agent/execute/shell/{exc_id}") data = res.json() if data["available"]: files = data["responseString"].split("\n") @@ -424,8 +426,9 @@ class AiravataOperator: """ import os fp = os.path.join("/data", remote_file) - res = requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={ + res = requests.post(f"{self.connection_svc_url()}/agent/execute/shell", json={ "agentId": agent_ref, + "envName": "base", "workingDir": ".", "arguments": ["sh", "-c", f"cat {fp} | base64 -w0"] }) @@ -442,7 +445,7 @@ class AiravataOperator: else: exc_id = data["executionId"] while True: - res = requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}") + res = requests.get(f"{self.connection_svc_url()}/agent/execute/shell/{exc_id}") data = res.json() if data["available"]: content = data["responseString"] @@ -469,8 +472,9 @@ class AiravataOperator: """ import os fp = os.path.join("/data", remote_file) - res = requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={ + res = requests.post(f"{self.connection_svc_url()}/agent/execute/shell", json={ "agentId": agent_ref, + "envName": "base", "workingDir": ".", "arguments": ["sh", "-c", f"cat {fp} | base64 -w0"] }) @@ -487,7 +491,7 @@ class AiravataOperator: else: exc_id = data["executionId"] while True: - res = requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}") + res = requests.get(f"{self.connection_svc_url()}/agent/execute/shell/{exc_id}") data = res.json() if data["available"]: content = data["responseString"] @@ -711,20 +715,12 @@ class AiravataOperator: def execute_py(self, project: str, libraries: list[str], code: str, agent_id: str, pid: str, runtime_args: dict, cold_start: bool = True) -> str | None: # lambda to send request - print(f"[av] Attempting to submit to agent {agent_id}...") - make_request = lambda: requests.post(f"{self.connection_svc_url()}/agent/executepythonrequest", json={ - "libraries": libraries, - "code": code, - "pythonVersion": "3.10", # TODO verify - "keepAlive": False, # TODO verify - "parentExperimentId": "/data", # the working directory - "agentId": agent_id, - }) try: if cold_start: - res = make_request() + print(f"[av] Looking for Agent {agent_id}...") + res = requests.get(f"{self.connection_svc_url()}/{agent_id}") data = res.json() - if data["error"] == "Agent not found": + if data["agentUp"] == False: # waiting for agent to be available print(f"[av] Agent {agent_id} not found! Relaunching...") self.launch_experiment( @@ -743,39 +739,55 @@ class AiravataOperator: walltime=runtime_args["walltime"], group=runtime_args["group"], ) - return self.execute_py(project, libraries, code, agent_id, pid, runtime_args, cold_start=False) - elif data["executionId"] is not None: - print(f"[av] Submitted to Python Interpreter") - # agent response - exc_id = data["executionId"] - else: - # unrecoverable error - raise Exception(data["error"]) - else: - # poll until agent is available - while True: - res = make_request() - data = res.json() - if data["error"] == "Agent not found": - # print(f"[av] Waiting for Agent {agent_id}...") - time.sleep(2) - continue - elif data["executionId"] is not None: - print(f"[av] Submitted to Python Interpreter") - exc_id = data["executionId"] - break - else: - raise Exception(data["error"]) - assert exc_id is not None, f"Invalid execution id: {exc_id}" + return self.execute_py(project, libraries, code, agent_id, pid, runtime_args, cold_start=False) + + print(f"[av] Waiting for Agent {agent_id}...") + while True: # poll for response + res = requests.get(f"{self.connection_svc_url()}/{agent_id}") + data = res.json() + if data["agentUp"]: + break + time.sleep(1) - # wait for the execution response to be available - while True: - res = requests.get(f"{self.connection_svc_url()}/agent/executepythonresponse/{exc_id}") + print(f"[av] Agent {agent_id} found! creating environment...") + res = requests.post(f"{self.connection_svc_url()}/setup/env", json={ + "agentId": agent_id, + "envName": "base", + "libraries": ["python=3.10", "pip"], + "pip": libraries, + }) + data = res.json() + if {exc_id := data["executionId"]} is None: + raise Exception(data["error"]) + while True: # poll for response + res = requests.get(f"{self.connection_svc_url()}/agent/setup/env/{exc_id}") + data = res.json() + if data["available"]: + response = str(data["responseString"]) + break + time.sleep(1) + + print(f"[av] Agent {agent_id} env created! executing code...") + res = requests.post(f"{self.connection_svc_url()}/agent/execute/python", json={ + "agentId": agent_id, + "envName": "base", + "workingDir": ".", + "code": code, + }) + data = res.json() + if {exc_id := data["executionId"]} is None: + raise Exception(data["error"]) + while True: # poll for response + res = requests.get(f"{self.connection_svc_url()}/agent/execute/python/{exc_id}") data = res.json() if data["available"]: response = str(data["responseString"]) - return response + break time.sleep(1) + + print(f"[av] Agent {agent_id} code executed! response: {response}") + return response + except Exception as e: print(f"[av] Remote execution failed! {e}") return None diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_jupyter_magic/__init__.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_jupyter_magic/__init__.py index 44e425fa36..90cdb8b8ee 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_jupyter_magic/__init__.py +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_jupyter_magic/__init__.py @@ -388,10 +388,9 @@ def run_on_runtime(rt_name: str, cell: str, store_history=False, silent=False, s url = api_base_url + '/api/v1/agent/execute/jupyter' data = { - "sessionId": "session1", - "keepAlive": True, + "agentId": rt.agentId, + "envName": "base", "code": cell, - "agentId": rt.agentId } json_data = json.dumps(data) response = requests.post( diff --git a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java index dedd735293..8cdc790b48 100644 --- a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java +++ b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java @@ -44,12 +44,12 @@ public class AgentController { return ResponseEntity.accepted().body(agentConnectionHandler.isAgentUp(agentId)); } - @PostMapping("/create/tunnel") + @PostMapping("/setup/tunnel") public ResponseEntity<AgentTunnelAck> runTunnelCreationOnAgent(@Valid @RequestBody AgentTunnelRequest tunnelRequest) { return ResponseEntity.accepted().body(agentConnectionHandler.runTunnelOnAgent(tunnelRequest)); } - @PostMapping("/create/env") + @PostMapping("/setup/env") public ResponseEntity<AgentEnvSetupAck> runEnvSetupOnAgent(@Valid @RequestBody AgentEnvSetupRequest envSetupRequest) { logger.info("Received env setup request to run on agent {}", envSetupRequest.getAgentId()); if (agentConnectionHandler.isAgentUp(envSetupRequest.getAgentId()).isAgentUp()) { @@ -62,7 +62,7 @@ public class AgentController { } } - @GetMapping("/create/env/{executionId}") + @GetMapping("/setup/env/{executionId}") public ResponseEntity<AgentEnvSetupResponse> getEnvSetupResponse(@PathVariable("executionId") String executionId) { return ResponseEntity.accepted().body(agentConnectionHandler.getEnvSetupResponse(executionId)); } diff --git a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/ExperimentController.java b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/ExperimentController.java index 351f3caf2d..c1cb78812c 100644 --- a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/ExperimentController.java +++ b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/ExperimentController.java @@ -1,16 +1,21 @@ package org.apache.airavata.agent.connection.service.controllers; +import javax.validation.Valid; + import org.apache.airavata.agent.connection.service.handlers.AgentManagementHandler; -import org.apache.airavata.agent.connection.service.models.LaunchAgentRequest; -import org.apache.airavata.agent.connection.service.models.LaunchAgentResponse; -import org.apache.airavata.agent.connection.service.models.TerminateAgentResponse; +import org.apache.airavata.agent.connection.service.models.AgentLaunchRequest; +import org.apache.airavata.agent.connection.service.models.AgentLaunchResponse; +import org.apache.airavata.agent.connection.service.models.AgentTerminateResponse; import org.apache.airavata.model.experiment.ExperimentModel; import org.apache.airavata.model.process.ProcessModel; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.*; - -import javax.validation.Valid; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/api/v1/exp") @@ -28,13 +33,13 @@ public class ExperimentController { } @PostMapping(value = "/launch", consumes = MediaType.APPLICATION_JSON_VALUE) - public ResponseEntity<LaunchAgentResponse> createAndLaunchExperiment(@Valid @RequestBody LaunchAgentRequest request) { - LaunchAgentResponse agentResponse = agentManagementHandler.createAndLaunchExperiment(request); + public ResponseEntity<AgentLaunchResponse> createAndLaunchExperiment(@Valid @RequestBody AgentLaunchRequest request) { + AgentLaunchResponse agentResponse = agentManagementHandler.createAndLaunchExperiment(request); return ResponseEntity.ok(agentResponse); } @GetMapping(value = "/terminate/{expId}", produces = MediaType.APPLICATION_JSON_VALUE) - public ResponseEntity<TerminateAgentResponse> terminateExperiment(@PathVariable("expId") String expId) { + public ResponseEntity<AgentTerminateResponse> terminateExperiment(@PathVariable("expId") String expId) { return ResponseEntity.ok(agentManagementHandler.terminateExperiment(expId)); } diff --git a/modules/agent-framework/airavata-agent/README.md b/modules/agent-framework/airavata-agent/README.md index 2868723b54..3e83d71623 100644 --- a/modules/agent-framework/airavata-agent/README.md +++ b/modules/agent-framework/airavata-agent/README.md @@ -76,7 +76,7 @@ Response ``` ``` -POST http://localhost:18880/api/v1/agent/create/tunnel +POST http://localhost:18880/api/v1/agent/setup/tunnel { "destinationHost": "32.241.33.22", diff --git a/modules/agent-framework/airavata-agent/agent.go b/modules/agent-framework/airavata-agent/agent.go index a89c30f258..3b8000aea9 100644 --- a/modules/agent-framework/airavata-agent/agent.go +++ b/modules/agent-framework/airavata-agent/agent.go @@ -96,20 +96,22 @@ func startInterceptor(stream Stream, grpcStreamChannel chan struct{}) { log.Printf("[agent.go] Recived a python execution request\n") executionId := x.PythonExecutionRequest.ExecutionId envName = x.PythonExecutionRequest.EnvName + workingDir := x.PythonExecutionRequest.WorkingDir code := x.PythonExecutionRequest.Code - workDir := x.PythonExecutionRequest.WorkingDir - go executePython(stream, executionId, envName, workDir, code) + go executePython(stream, executionId, envName, workingDir, code) case *protos.ServerMessage_CommandExecutionRequest: log.Printf("[agent.go] Recived a shell execution request\n") executionId := x.CommandExecutionRequest.ExecutionId + envName = x.CommandExecutionRequest.EnvName + workingDir := x.CommandExecutionRequest.WorkingDir execArgs := x.CommandExecutionRequest.Arguments - go executeShell(stream, executionId, envName, execArgs) + go executeShell(stream, executionId, envName, workingDir, execArgs) case *protos.ServerMessage_JupyterExecutionRequest: log.Printf("[agent.go] Recived a jupyter execution request\n") executionId := x.JupyterExecutionRequest.ExecutionId - envName := x.JupyterExecutionRequest.EnvName + envName = x.JupyterExecutionRequest.EnvName code := x.JupyterExecutionRequest.Code go executeJupyter(stream, executionId, envName, code) @@ -132,21 +134,28 @@ func createEnv(stream Stream, executionId string, envName string, envLibs []stri log.Printf("[agent.go] createEnv() Env libs %s\n", envLibs) log.Printf("[agent.go] createEnv() Env pip %s\n", envPip) // create environment - createEnvCmd := exec.Command("micromamba", "create", "-n", envName, "--yes", "--quiet") - if err := createEnvCmd.Wait(); err != nil { - log.Printf("[agent.go] createEnv() Error creating environment: %v\n", err) - return + if envName != "base" { + createEnvCmd := exec.Command("micromamba", "create", "-n", envName, "--yes", "--quiet") + if err := createEnvCmd.Wait(); err != nil { + log.Printf("[agent.go] createEnv() Error creating environment: %v\n", err) + return + } + log.Printf("[agent.go] createEnv() Environment created: %s\n", envName) } - log.Printf("[agent.go] createEnv() Environment created: %s\n", envName) - installDepsCmd := exec.Command("micromamba", "install", "-n", envName, "--yes", "--quiet", strings.Join(envLibs, " ")) - if err := installDepsCmd.Wait(); err != nil { - log.Printf("[agent.go] createEnv() Error waiting for command: %v\n", err) - return + + if len(envLibs) > 0 { + installDepsCmd := exec.Command("micromamba", "install", "-n", envName, "--yes", "--quiet", strings.Join(envLibs, " ")) + if err := installDepsCmd.Wait(); err != nil { + log.Printf("[agent.go] createEnv() Error waiting for command: %v\n", err) + return + } } - installPipCmd := exec.Command("micromamba", "run", "-n", envName, "pip", "install", strings.Join(envLibs, " ")) - if err := installPipCmd.Wait(); err != nil { - log.Printf("[agent.go] createEnv() Error waiting for command: %v\n", err) - return + if len(envPip) > 0 { + installPipCmd := exec.Command("micromamba", "run", "-n", envName, "pip", "install", strings.Join(envLibs, " ")) + if err := installPipCmd.Wait(); err != nil { + log.Printf("[agent.go] createEnv() Error waiting for command: %v\n", err) + return + } } // start python server go startPythonServer(envName) @@ -209,8 +218,10 @@ func executePython(stream Stream, executionId string, envName string, workingDir log.Printf("[agent.go] executePython() Working Dir %s\n", workingDir) log.Printf("[agent.go] executePython() Code %s\n", code) // Run command - pythonCmd := exec.Command("micromamba", "run", "-n", envName, "python", "-c", code) - output, err := pythonCmd.CombinedOutput() + cmd := exec.Command("micromamba", "run", "-n", envName, "python", "-c") + cmd.Dir = workingDir + cmd.Stdin = strings.NewReader(code) + output, err := cmd.CombinedOutput() var responseString string if err != nil { responseString = fmt.Sprintf("Error: %v", err) @@ -234,12 +245,13 @@ func executePython(stream Stream, executionId string, envName string, workingDir } } -func executeShell(stream Stream, executionId string, envName string, execArgs []string) { +func executeShell(stream Stream, executionId string, envName string, workingDir string, execArgs []string) { log.Printf("[agent.go] executeShell() Execution id %s\n", executionId) log.Printf("[agent.go] executeShell() Env name %s\n", envName) log.Printf("[agent.go] executeShell() Exec args %s\n", execArgs) // Run command cmd := exec.Command("micromamba", "run", "-n", envName, strings.Join(execArgs, " ")) + cmd.Dir = workingDir output, err := cmd.CombinedOutput() var responseString string if err != nil {
