This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/master by this push:
new 9ccf69a350 Emitting agent execution response
9ccf69a350 is described below
commit 9ccf69a350b446e8b5d265d06ebc06b0bca6f52a
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Wed Jul 17 16:59:03 2024 -0400
Emitting agent execution response
---
modules/agent-framework/airavata-agent/agent.go | 10 ++++++-
.../service/controllers/AgentController.java | 5 ++++
.../connection/service/handlers/AgentHandler.java | 17 +++++++++++-
.../service/models/AgentCommandResponse.java | 31 ++++++++++++++++++++++
4 files changed, 61 insertions(+), 2 deletions(-)
diff --git a/modules/agent-framework/airavata-agent/agent.go
b/modules/agent-framework/airavata-agent/agent.go
index bdefd0bf79..071418bf37 100644
--- a/modules/agent-framework/airavata-agent/agent.go
+++ b/modules/agent-framework/airavata-agent/agent.go
@@ -67,7 +67,15 @@ func main() {
log.Fatalf(err.Error())
return
}
- log.Printf("Execution output is %s",
string(stdout))
+
+ stdoutString := string(stdout)
+ log.Printf("Execution output is %s",
stdoutString)
+
+ if err :=
stream.Send(&protos.AgentMessage{Message:
+
&protos.AgentMessage_CommandExecutionResponse{
+ CommandExecutionResponse:
&protos.CommandExecutionResponse{ExecutionId: executionId, ResponseString:
stdoutString}}}); err != nil {
+ log.Printf("Failed to send execution
result to server: %v", err)
+ }
case *protos.ServerMessage_TunnelCreationRequest:
log.Printf("Received a tunnel creation request")
diff --git
a/modules/agent-framework/connection-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java
b/modules/agent-framework/connection-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java
index 61ef791a60..afb458c5fb 100644
---
a/modules/agent-framework/connection-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java
+++
b/modules/agent-framework/connection-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java
@@ -44,4 +44,9 @@ public class AgentController {
}
}
+ @GetMapping("/execution/{executionId}")
+ public ResponseEntity<AgentCommandResponse>
getExecutionResponse(@PathVariable("executionId") String executionId) {
+ return
ResponseEntity.accepted().body(agentHandler.getAgentCommandResponse(executionId));
+ }
+
}
diff --git
a/modules/agent-framework/connection-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentHandler.java
b/modules/agent-framework/connection-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentHandler.java
index 7c34dff2fa..95cc86e2a3 100644
---
a/modules/agent-framework/connection-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentHandler.java
+++
b/modules/agent-framework/connection-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentHandler.java
@@ -24,6 +24,8 @@ public class AgentHandler extends
AgentCommunicationServiceGrpc.AgentCommunicati
// <agentId, streamId>
private final Map<String, String> AGENT_STREAM_MAPPING = new
ConcurrentHashMap<>();
+ private final Map<String, CommandExecutionResponse>
EXECUTION_RESPONSE_CACHE = new ConcurrentHashMap<>();
+
public AgentInfoResponse isAgentUp(String agentId) {
if (AGENT_STREAM_MAPPING.containsKey(agentId) &&
@@ -34,6 +36,18 @@ public class AgentHandler extends
AgentCommunicationServiceGrpc.AgentCommunicati
}
}
+ public AgentCommandResponse getAgentCommandResponse(String executionId) {
+ AgentCommandResponse agentCommandResponse = new AgentCommandResponse();
+ if (EXECUTION_RESPONSE_CACHE.containsKey(executionId)) {
+
agentCommandResponse.setResponseString(EXECUTION_RESPONSE_CACHE.get(executionId).getResponseString());
+ agentCommandResponse.setExecutionId(executionId);
+ agentCommandResponse.setAvailable(true);
+ } else {
+ agentCommandResponse.setAvailable(false);
+ }
+ return agentCommandResponse;
+ }
+
public AgentTunnelAck runTunnelOnAgent(AgentTunnelCreationRequest
tunnelRequest) {
AgentTunnelAck ack = new AgentTunnelAck();
@@ -101,7 +115,8 @@ public class AgentHandler extends
AgentCommunicationServiceGrpc.AgentCommunicati
}
private void handleCommandExecutionResponse (CommandExecutionResponse
commandExecutionResponse) {
-
+ logger.info("Received command execution response for execution id {}",
commandExecutionResponse.getExecutionId());
+
EXECUTION_RESPONSE_CACHE.put(commandExecutionResponse.getExecutionId(),
commandExecutionResponse);
}
private void handleContainerExecutionResponse (ContainerExecutionResponse
containerExecutionResponse) {
diff --git
a/modules/agent-framework/connection-service/src/main/java/org/apache/airavata/agent/connection/service/models/AgentCommandResponse.java
b/modules/agent-framework/connection-service/src/main/java/org/apache/airavata/agent/connection/service/models/AgentCommandResponse.java
new file mode 100644
index 0000000000..74cfb4734f
--- /dev/null
+++
b/modules/agent-framework/connection-service/src/main/java/org/apache/airavata/agent/connection/service/models/AgentCommandResponse.java
@@ -0,0 +1,31 @@
+package org.apache.airavata.agent.connection.service.models;
+
+public class AgentCommandResponse {
+ private String executionId;
+ private String responseString;
+ private boolean isAvailable;
+
+ public String getExecutionId() {
+ return executionId;
+ }
+
+ public void setExecutionId(String executionId) {
+ this.executionId = executionId;
+ }
+
+ public String getResponseString() {
+ return responseString;
+ }
+
+ public void setResponseString(String responseString) {
+ this.responseString = responseString;
+ }
+
+ public boolean isAvailable() {
+ return isAvailable;
+ }
+
+ public void setAvailable(boolean available) {
+ isAvailable = available;
+ }
+}