This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch agent-framewok-refactoring in repository https://gitbox.apache.org/repos/asf/airavata.git
commit ee9c67cdc0f62fa2566565e47587faa4340960c5 Author: lahiruj <[email protected]> AuthorDate: Fri Oct 18 18:32:48 2024 -0400 included error outputs and enhanced agent log messages --- modules/agent-framework/airavata-agent/agent.go | 93 +++++++++++----------- .../v1l4/jupyterlab/labconfig/airavata_magics.py | 20 ++++- .../jupyter/deployments/i-guide/agent/kernel.py | 19 +++-- .../jupyterlab/labconfig/airavata_magics.py | 20 ++++- .../airavata-agent/jupyter/kernel.py | 19 +++-- .../jupyter/labconfig/airavata_magics.py | 20 ++++- 6 files changed, 128 insertions(+), 63 deletions(-) diff --git a/modules/agent-framework/airavata-agent/agent.go b/modules/agent-framework/airavata-agent/agent.go index 1fae63950a..6e89469fd2 100644 --- a/modules/agent-framework/airavata-agent/agent.go +++ b/modules/agent-framework/airavata-agent/agent.go @@ -56,21 +56,21 @@ func main() { stdout, err := cmd.StdoutPipe() if err != nil { - fmt.Println("Error creating StdoutPipe:", err) + fmt.Println("[agent.go] Error creating StdoutPipe:", err) return } // Get stderr pipe stderr, err := cmd.StderrPipe() if err != nil { - fmt.Println("Error creating StderrPipe:", err) + fmt.Println("[agent.go] Error creating StderrPipe:", err) return } - log.Printf("Starting command for execution") + log.Printf("[agent.go] Starting command for execution") // Start the command if err := cmd.Start(); err != nil { - fmt.Println("Error starting command:", err) + fmt.Println("[agent.go] Error starting command:", err) return } @@ -81,24 +81,24 @@ func main() { // Stream stdout go func() { for stdoutScanner.Scan() { - fmt.Printf("stdout: %s\n", stdoutScanner.Text()) + fmt.Printf("[agent.go] stdout: %s\n", stdoutScanner.Text()) } }() // Stream stderr go func() { for stderrScanner.Scan() { - fmt.Printf("stderr: %s\n", stderrScanner.Text()) + fmt.Printf("[agent.go] stderr: %s\n", stderrScanner.Text()) } }() // Wait for the command to finish if err := cmd.Wait(); err != nil { - fmt.Println("Error waiting for command:", err) + fmt.Println("[agent.go] Error waiting for command:", err) return } - fmt.Println("Command finished") + fmt.Println("[agent.go] Command finished") }() go func() { @@ -109,17 +109,17 @@ func main() { return } if err != nil { - log.Fatalf("Failed to receive a message : %v", err) + log.Fatalf("[agent.go] Failed to receive a message : %v", err) } - log.Printf("Received message %s", in.Message) + log.Printf("[agent.go] Received message %s", in.Message) switch x := in.GetMessage().(type) { case *protos.ServerMessage_CommandExecutionRequest: - log.Printf("Recived a command execution request") + log.Printf("[agent.go] Recived a command execution request") executionId := x.CommandExecutionRequest.ExecutionId execArgs := x.CommandExecutionRequest.Arguments - log.Printf("Execution id %s", executionId) + log.Printf("[agent.go] Execution id %s", executionId) cmd := exec.Command(execArgs[0], execArgs[1:]...) - log.Printf("Completed execution with the id %s", executionId) + log.Printf("[agent.go] Completed execution with the id %s", executionId) stdout, err := cmd.Output() if err != nil { log.Fatalf(err.Error()) @@ -127,60 +127,60 @@ func main() { } stdoutString := string(stdout) - log.Printf("Execution output is %s", stdoutString) + log.Printf("[agent.go] Execution output is %s", stdoutString) if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_CommandExecutionResponse{ CommandExecutionResponse: &protos.CommandExecutionResponse{ExecutionId: executionId, ResponseString: stdoutString}}}); err != nil { - log.Printf("Failed to send execution result to server: %v", err) + log.Printf("[agent.go] Failed to send execution result to server: %v", err) } case *protos.ServerMessage_JupyterExecutionRequest: - log.Printf("Recived a jupyter execution request") + log.Printf("[agent.go] Recived a jupyter execution request") executionId := x.JupyterExecutionRequest.ExecutionId sessionId := x.JupyterExecutionRequest.SessionId code := x.JupyterExecutionRequest.Code - log.Printf("Execution ID: %s, Session ID: %s, Code: %s", executionId, sessionId, code) + log.Printf("[agent.go] Execution ID: %s, Session ID: %s, Code: %s", executionId, sessionId, code) url := "http://127.0.0.1:15000/start" client := &http.Client{} req, err := http.NewRequest("GET", url, nil) if err != nil { - log.Printf("Failed to create the request start jupyter kernel: %v", err) + log.Printf("[agent.go] Failed to create the request start jupyter kernel: %v", err) jupyterResponse := "Failed while running the cell in remote. Please retry" if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{ JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil { - log.Printf("Failed to send jupyter execution result to server: %v", err) + log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err) } return } - log.Printf("Sending the jupyter execution result to server...") + log.Printf("[agent.go] Sending the jupyter kernel start request to server...") resp, err := client.Do(req) if err != nil { - log.Printf("Failed to send the request start jupyter kernel: %v", err) + log.Printf("[agent.go] Failed to send the request start jupyter kernel: %v", err) jupyterResponse := "Failed while running the cell in remote. Please retry" if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{ JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil { - log.Printf("Failed to send jupyter execution result to server: %v", err) + log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err) } return } - log.Printf("Successfully sent the jupyter execution result to server") + log.Printf("[agent.go] Successfully sent the jupyter kernel start request to server") defer func() { err := resp.Body.Close() if err != nil { - log.Printf("Failed to close the response body for kernel start: %v", err) + log.Printf("[agent.go] Failed to close the response body for kernel start: %v", err) jupyterResponse := "Failed while running the cell in remote. Please retry" if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{ JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil { - log.Printf("Failed to send jupyter execution result to server: %v", err) + log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err) } return @@ -189,45 +189,46 @@ func main() { body, err := ioutil.ReadAll(resp.Body) if err != nil { - log.Printf("Failed to read response for start jupyter kernel: %v", err) + log.Printf("[agent.go] Failed to read response for start jupyter kernel: %v", err) jupyterResponse := "Failed while running the cell in remote. Please retry" if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{ JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil { - log.Printf("Failed to send jupyter execution result to server: %v", err) + log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err) } return } - log.Printf("Starting to marshal JSON data...") + log.Printf("[agent.go] Starting to marshal execution request JSON data...") url = "http://127.0.0.1:15000/execute" data := map[string]string{ - "code": code, + "code": code, + "executionId": executionId, } jsonData, err := json.Marshal(data) if err != nil { - log.Fatalf("Failed to marshal JSON: %v", err) + log.Fatalf("[agent.go] Failed to marshal JSON: %v", err) jupyterResponse := "Failed while running the cell in remote. Please retry" if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{ JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil { - log.Printf("Failed to send jupyter execution result to server: %v", err) + log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err) } return } - log.Printf("Successful marshaling the JSON data") + log.Printf("[agent.go] Successful marshaling the JSON data") req, err = http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) if err != nil { - log.Printf("Failed to create the request run jupyter kernel: %v", err) + log.Printf("[agent.go] Failed to create the request run jupyter kernel: %v", err) jupyterResponse := "Failed while running the cell in remote. Please retry" if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{ JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil { - log.Printf("Failed to send jupyter execution result to server: %v", err) + log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err) } return @@ -238,63 +239,63 @@ func main() { resp, err = client.Do(req) if err != nil { - log.Printf("Failed to send the request run jupyter kernel: %v", err) + log.Printf("[agent.go] Failed to send the request run jupyter kernel: %v", err) jupyterResponse := "Failed while running the cell in remote. Please retry" if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{ JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil { - log.Printf("Failed to send jupyter execution result to server: %v", err) + log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err) } return } defer func() { - log.Printf("Closing the response...") + log.Printf("[agent.go] Closing the response...") err := resp.Body.Close() if err != nil { - log.Printf("Failed to close the response body for kernel execution: %v", err) + log.Printf("[agent.go] Failed to close the response body for kernel execution: %v", err) jupyterResponse := "Failed while running the cell in remote. Please retry" if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{ JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil { - log.Printf("Failed to send jupyter execution result to server: %v", err) + log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err) } return } }() - log.Printf("Sending the jupyter execution result to server...") + log.Printf("[agent.go] Sending the jupyter execution " + executionId + "result to server...") body, err = ioutil.ReadAll(resp.Body) if err != nil { - log.Printf("Failed to read response for run jupyter kernel: %v", err) + log.Printf("[agent.go] Failed to read response for run jupyter kernel: %v", err) jupyterResponse := "Failed while running the cell in remote. Please retry" if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{ JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil { - log.Printf("Failed to send jupyter execution result to server: %v", err) + log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err) } return } jupyterResponse := string(body) - log.Println("Jupyter execution response: " + jupyterResponse) + log.Println("[agent.go] Jupyter execution " + executionId + "response: " + jupyterResponse) if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{ JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil { - log.Printf("Failed to send jupyter execution result to server: %v", err) + log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err) } case *protos.ServerMessage_TunnelCreationRequest: - log.Printf("Received a tunnel creation request") + log.Printf("[agent.go] Received a tunnel creation request") host := x.TunnelCreationRequest.DestinationHost destPort := x.TunnelCreationRequest.DestinationPort srcPort := x.TunnelCreationRequest.SourcePort keyPath := x.TunnelCreationRequest.SshKeyPath sshUser := x.TunnelCreationRequest.SshUserName - log.Printf("Tunnel details - Host: %s, DestPort: %s, SrcPort: %s, KeyPath: %s, SSH User: %s", host, destPort, srcPort, keyPath, sshUser) + log.Printf("[agent.go] Tunnel details - Host: %s, DestPort: %s, SrcPort: %s, KeyPath: %s, SSH User: %s", host, destPort, srcPort, keyPath, sshUser) openRemoteTunnel(host, destPort, srcPort, sshUser, keyPath) } diff --git a/modules/agent-framework/airavata-agent/jupyter/deployments/cerebrum/v1l4/jupyterlab/labconfig/airavata_magics.py b/modules/agent-framework/airavata-agent/jupyter/deployments/cerebrum/v1l4/jupyterlab/labconfig/airavata_magics.py index dd629dfd09..b472ba2382 100644 --- a/modules/agent-framework/airavata-agent/jupyter/deployments/cerebrum/v1l4/jupyterlab/labconfig/airavata_magics.py +++ b/modules/agent-framework/airavata-agent/jupyter/deployments/cerebrum/v1l4/jupyterlab/labconfig/airavata_magics.py @@ -179,7 +179,25 @@ def run_remote(line, cell): # print(data_obj['text/plain']) elif output_type == 'stream': - print(output.get('text', '')) + stream_name = output.get('name', 'stdout') + stream_text = output.get('text', '') + if stream_name == 'stderr': + error_html = f""" + <div style=" + color: #a71d5d; + background-color: #fdd; + border: 1px solid #a71d5d; + padding: 10px; + border-radius: 5px; + font-family: Consolas, 'Courier New', monospace; + white-space: pre-wrap; + "> + {stream_text} + </div> + """ + display(HTML(error_html)) + else: + print(stream_text) elif output_type == 'error': ename = output.get('ename', 'Error') diff --git a/modules/agent-framework/airavata-agent/jupyter/deployments/i-guide/agent/kernel.py b/modules/agent-framework/airavata-agent/jupyter/deployments/i-guide/agent/kernel.py index 17dc3ab110..c9bd281479 100644 --- a/modules/agent-framework/airavata-agent/jupyter/deployments/i-guide/agent/kernel.py +++ b/modules/agent-framework/airavata-agent/jupyter/deployments/i-guide/agent/kernel.py @@ -57,10 +57,8 @@ def execute(): while True: try: - msg = kc.get_iopub_msg(timeout=1) - print("------------------") - print(msg) - print("-================-") + msg = kc.get_iopub_msg(timeout=5) + content = msg.get("content", {}) msg_type = msg.get("msg_type", "") @@ -68,7 +66,7 @@ def execute(): if msg_type == "execute_input": execution_noticed = True - # Handle stdout text stream + # Handle stdout streams if msg_type == "stream" and content.get("name") == "stdout": outputs.append({ "output_type": "stream", @@ -76,7 +74,15 @@ def execute(): "text": content.get("text", "") }) - # Capture display data (e.g. plot) + # Handle stderr streams + if msg_type == "stream" and content.get("name") == "stderr": + outputs.append({ + "output_type": "stream", + "name": "stderr", + "text": content.get("text", "") + }) + + # Handle display data (e.g. plots) if msg_type == "display_data": outputs.append({ "output_type": "display_data", @@ -112,7 +118,6 @@ def execute(): return jsonify({'error': "Execution interrupted by user"}), 500 except Exception as e: print(f"Error while getting Jupyter message: {str(e)}") - pass response = { "outputs": outputs diff --git a/modules/agent-framework/airavata-agent/jupyter/deployments/i-guide/jupyterlab/labconfig/airavata_magics.py b/modules/agent-framework/airavata-agent/jupyter/deployments/i-guide/jupyterlab/labconfig/airavata_magics.py index ec0d653d85..7d286f0ab2 100644 --- a/modules/agent-framework/airavata-agent/jupyter/deployments/i-guide/jupyterlab/labconfig/airavata_magics.py +++ b/modules/agent-framework/airavata-agent/jupyter/deployments/i-guide/jupyterlab/labconfig/airavata_magics.py @@ -187,7 +187,25 @@ def run_remote(line, cell): # print(data_obj['text/plain']) elif output_type == 'stream': - print(output.get('text', '')) + stream_name = output.get('name', 'stdout') + stream_text = output.get('text', '') + if stream_name == 'stderr': + error_html = f""" + <div style=" + color: #a71d5d; + background-color: #fdd; + border: 1px solid #a71d5d; + padding: 10px; + border-radius: 5px; + font-family: Consolas, 'Courier New', monospace; + white-space: pre-wrap; + "> + {stream_text} + </div> + """ + display(HTML(error_html)) + else: + print(stream_text) elif output_type == 'error': ename = output.get('ename', 'Error') diff --git a/modules/agent-framework/airavata-agent/jupyter/kernel.py b/modules/agent-framework/airavata-agent/jupyter/kernel.py index 17dc3ab110..c9bd281479 100644 --- a/modules/agent-framework/airavata-agent/jupyter/kernel.py +++ b/modules/agent-framework/airavata-agent/jupyter/kernel.py @@ -57,10 +57,8 @@ def execute(): while True: try: - msg = kc.get_iopub_msg(timeout=1) - print("------------------") - print(msg) - print("-================-") + msg = kc.get_iopub_msg(timeout=5) + content = msg.get("content", {}) msg_type = msg.get("msg_type", "") @@ -68,7 +66,7 @@ def execute(): if msg_type == "execute_input": execution_noticed = True - # Handle stdout text stream + # Handle stdout streams if msg_type == "stream" and content.get("name") == "stdout": outputs.append({ "output_type": "stream", @@ -76,7 +74,15 @@ def execute(): "text": content.get("text", "") }) - # Capture display data (e.g. plot) + # Handle stderr streams + if msg_type == "stream" and content.get("name") == "stderr": + outputs.append({ + "output_type": "stream", + "name": "stderr", + "text": content.get("text", "") + }) + + # Handle display data (e.g. plots) if msg_type == "display_data": outputs.append({ "output_type": "display_data", @@ -112,7 +118,6 @@ def execute(): return jsonify({'error': "Execution interrupted by user"}), 500 except Exception as e: print(f"Error while getting Jupyter message: {str(e)}") - pass response = { "outputs": outputs diff --git a/modules/agent-framework/airavata-agent/jupyter/labconfig/airavata_magics.py b/modules/agent-framework/airavata-agent/jupyter/labconfig/airavata_magics.py index 322a10adc8..d2b34d5d6a 100644 --- a/modules/agent-framework/airavata-agent/jupyter/labconfig/airavata_magics.py +++ b/modules/agent-framework/airavata-agent/jupyter/labconfig/airavata_magics.py @@ -178,7 +178,25 @@ def run_remote(line, cell): # print(data_obj['text/plain']) elif output_type == 'stream': - print(output.get('text', '')) + stream_name = output.get('name', 'stdout') + stream_text = output.get('text', '') + if stream_name == 'stderr': + error_html = f""" + <div style=" + color: #a71d5d; + background-color: #fdd; + border: 1px solid #a71d5d; + padding: 10px; + border-radius: 5px; + font-family: Consolas, 'Courier New', monospace; + white-space: pre-wrap; + "> + {stream_text} + </div> + """ + display(HTML(error_html)) + else: + print(stream_text) elif output_type == 'error': ename = output.get('ename', 'Error')
