This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch hpdc-benchmark
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 638916072fc2747c17ad33c9a1a3deb847a5de15
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Wed Feb 5 21:20:12 2025 -0500

    Temp changes to hpdc benchmark
---
 .../service/controllers/AgentController.java       |  5 ++
 .../service/handlers/AgentConnectionHandler.java   | 81 +++++++++++++++++++++-
 .../agent-framework/{ => agent-simulator}/pom.xml  | 21 +++---
 .../src/main/java/RandomizedIntervalSimulator.java | 59 ++++++++++++++++
 .../src/main/java/ResourceScheduler.java           | 29 ++++++++
 .../src/main/java/SlurmConnector.java              | 52 ++++++++++++++
 modules/agent-framework/airavata-agent/agent.go    | 60 ++--------------
 modules/agent-framework/pom.xml                    |  1 +
 8 files changed, 243 insertions(+), 65 deletions(-)

diff --git 
a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java
 
b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java
index c0380f7a2a..836d178d96 100644
--- 
a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java
+++ 
b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java
@@ -82,6 +82,11 @@ public class AgentController {
         }
     }
 
+    @GetMapping("/runLoad")
+    public void runLoad() {
+        agentConnectionHandler.runLoad();
+    }
+
     @GetMapping("/executepythonresponse/{executionId}")
     public ResponseEntity<AgentPythonRunResponse> 
getPythonResponse(@PathVariable("executionId") String executionId) {
         return 
ResponseEntity.accepted().body(agentConnectionHandler.getPythonExecutionResponse(executionId));
diff --git 
a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentConnectionHandler.java
 
b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentConnectionHandler.java
index 5fdb003c89..ec5b6998ca 100644
--- 
a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentConnectionHandler.java
+++ 
b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentConnectionHandler.java
@@ -13,8 +13,10 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 @GrpcService
@@ -117,6 +119,74 @@ public class AgentConnectionHandler extends 
AgentCommunicationServiceGrpc.AgentC
         return runResponse;
     }
 
+    private String loadCode = "import time\n" +
+            "\n" +
+            "print(\"Waiting for 30 seconds...\")\n" +
+            "time.sleep(30)\n" +
+            "print(\"Done waiting!\")";
+
+    private Map<String, String> BUSY_AGENTS_WITH_EXECUTION = new 
ConcurrentHashMap<>();
+    private Map<String, String> COMPLETED_AGENTS_WITH_EXECUTION = new 
ConcurrentHashMap<>();
+    private int loadSize = 20;
+    private AtomicInteger jobsTobeSubmitted = new AtomicInteger(0);
+    private AtomicInteger jobsCompleted = new AtomicInteger(0);
+
+    long startTime = 0;
+    public void runLoad() {
+        startTime = System.currentTimeMillis();
+        logger.info("Running load on agents");
+        jobsCompleted.set(0);
+        jobsTobeSubmitted.set(loadSize);
+        runLoadScheduling();
+    }
+
+    private synchronized void runLoadScheduling() {
+
+        for (String execId : COMPLETED_AGENTS_WITH_EXECUTION.keySet()) {
+            String agentId = COMPLETED_AGENTS_WITH_EXECUTION.get(execId);
+            String streamId = AGENT_STREAM_MAPPING.get(agentId);
+            if (ACTIVE_STREAMS.containsKey(streamId)) {
+                logger.info("Sending kill message to agent {}", agentId);
+                StreamObserver<ServerMessage> streamObserver = 
ACTIVE_STREAMS.get(streamId);
+                
streamObserver.onNext(ServerMessage.newBuilder().setPythonExecutionRequest(
+                        PythonExecutionRequest.newBuilder()
+                                .setExecutionId(UUID.randomUUID().toString())
+                                .setKeepAlive(true)
+                                .setCode("kill")
+                                .setWorkingDir("")).build());
+                COMPLETED_AGENTS_WITH_EXECUTION.remove(execId);
+            }
+        }
+
+        if (jobsTobeSubmitted.get() <= 0) {
+            logger.info("Loading agents completed");
+            return;
+        }
+        for (String agentId : AGENT_STREAM_MAPPING.keySet()) {
+
+            if (BUSY_AGENTS_WITH_EXECUTION.containsValue(agentId)) {
+                logger.info("Busy agent {}", agentId);
+                continue;
+            }
+
+            String streamId = AGENT_STREAM_MAPPING.get(agentId);
+            if (ACTIVE_STREAMS.containsKey(streamId)) {
+                String executionId = UUID.randomUUID().toString();
+                logger.info("Running load on agent {} for execution {}", 
agentId, executionId);
+                StreamObserver<ServerMessage> streamObserver = 
ACTIVE_STREAMS.get(streamId);
+                BUSY_AGENTS_WITH_EXECUTION.put(executionId, agentId);
+                
streamObserver.onNext(ServerMessage.newBuilder().setPythonExecutionRequest(
+                        PythonExecutionRequest.newBuilder()
+                                .setExecutionId(executionId)
+                                .setKeepAlive(true)
+                                .setCode(loadCode)
+                                .setWorkingDir("")).build());
+                jobsTobeSubmitted.decrementAndGet();
+            }
+        }
+    }
+
+
     public AgentPythonRunAck runPythonOnAgent(AgentPythonRunRequest 
pythonRunRequest) {
         String executionId = UUID.randomUUID().toString();
         AgentPythonRunAck ack = new AgentPythonRunAck();
@@ -218,8 +288,9 @@ public class AgentConnectionHandler extends 
AgentCommunicationServiceGrpc.AgentC
     }
 
     private void handleAgentPing(AgentPing agentPing, String streamId) {
-        logger.info("Received agent ping for agent id {}", 
agentPing.getAgentId());
+        logger.info("Received agent ping for agent id {} with stram {}", 
agentPing.getAgentId(), streamId);
         AGENT_STREAM_MAPPING.put(agentPing.getAgentId(), streamId);
+        runLoadScheduling();
     }
 
     private void handleCommandExecutionResponse (CommandExecutionResponse 
commandExecutionResponse) {
@@ -243,6 +314,14 @@ public class AgentConnectionHandler extends 
AgentCommunicationServiceGrpc.AgentC
     private void handlePythonExecutionResponse (PythonExecutionResponse 
executionResponse) {
         logger.info("Received python execution response for execution id {}", 
executionResponse.getExecutionId());
         
PYTHON_EXECUTION_RESPONSE_CACHE.put(executionResponse.getExecutionId(), 
executionResponse);
+        
//BUSY_AGENTS_WITH_EXECUTION.remove(executionResponse.getExecutionId());
+        COMPLETED_AGENTS_WITH_EXECUTION.put(executionResponse.getExecutionId(),
+                
BUSY_AGENTS_WITH_EXECUTION.get(executionResponse.getExecutionId()));
+        int completed = jobsCompleted.incrementAndGet();
+        if (completed == loadSize) {
+            logger.info("ALL JOBS " + loadSize +"  COMPLETED time " + 
(System.currentTimeMillis() - startTime)/1000 + "s");
+        }
+        runLoadScheduling();
     }
 
 
diff --git a/modules/agent-framework/pom.xml 
b/modules/agent-framework/agent-simulator/pom.xml
similarity index 55%
copy from modules/agent-framework/pom.xml
copy to modules/agent-framework/agent-simulator/pom.xml
index 0d19265b9c..327d518e67 100644
--- a/modules/agent-framework/pom.xml
+++ b/modules/agent-framework/agent-simulator/pom.xml
@@ -5,20 +5,23 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.airavata</groupId>
-        <artifactId>airavata</artifactId>
+        <artifactId>agent-framework</artifactId>
         <version>0.21-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
     </parent>
 
-    <artifactId>agent-framework</artifactId>
-    <packaging>pom</packaging>
-    <modules>
-        <module>agent-service</module>
-    </modules>
+    <artifactId>agent-simulator</artifactId>
 
+    <dependencies>
+        <dependency>
+            <groupId>com.hierynomus</groupId>
+            <artifactId>sshj</artifactId>
+            <version>0.38.0</version>
+        </dependency>
+
+    </dependencies>
     <properties>
-        <maven.compiler.source>11</maven.compiler.source>
-        <maven.compiler.target>11</maven.compiler.target>
+        <maven.compiler.source>22</maven.compiler.source>
+        <maven.compiler.target>22</maven.compiler.target>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     </properties>
 
diff --git 
a/modules/agent-framework/agent-simulator/src/main/java/RandomizedIntervalSimulator.java
 
b/modules/agent-framework/agent-simulator/src/main/java/RandomizedIntervalSimulator.java
new file mode 100644
index 0000000000..19bf7bc4fd
--- /dev/null
+++ 
b/modules/agent-framework/agent-simulator/src/main/java/RandomizedIntervalSimulator.java
@@ -0,0 +1,59 @@
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.Random;
+import java.util.UUID;
+
+public class RandomizedIntervalSimulator {
+
+    private int simulatorDelays[] = {120};
+    private String agentPath = 
"/Users/dwannipurage3/code/airavata/modules/agent-framework/airavata-agent/airavata-agent";
+    private String agentServiceHost = "localhost:19900";
+    private int totalJobs  = 20;
+
+    private void runAgent(String agentId) throws Exception {
+        ProcessBuilder processBuilder = new ProcessBuilder(agentPath, 
agentServiceHost, agentId);
+
+        processBuilder.redirectErrorStream(true);
+        Process process = processBuilder.start();
+        try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(process.getInputStream()))) {
+            String line;
+            while ((line = reader.readLine()) != null) {
+                System.out.println(line);
+            }
+        }
+        int exitCode = process.waitFor();
+        System.out.println("Process exited with code: " + exitCode + " for 
agent " + agentId);
+    }
+
+    private void runSimulator(int interval) throws Exception {
+
+        Random random = new Random();
+        int randomInterval = random.nextInt(interval + 1);
+
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    System.out.println("Agent for interval " + interval + " 
starting in " + randomInterval);
+                    Thread.sleep(randomInterval * 1000);
+                    runAgent(interval + "-" + UUID.randomUUID().toString());
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }).start();
+    }
+
+    public static void main(String[] args) throws Exception {
+        RandomizedIntervalSimulator simulator = new 
RandomizedIntervalSimulator();
+        for (int simulatorDelay : simulator.simulatorDelays) {
+            for (int i = 0; i < simulator.totalJobs; i ++) {
+                System.out.println("Simulating job " + i + " of delay " + 
simulatorDelay);
+                simulator.runSimulator(simulatorDelay);
+            }
+        }
+    }
+
+
+
+}
diff --git 
a/modules/agent-framework/agent-simulator/src/main/java/ResourceScheduler.java 
b/modules/agent-framework/agent-simulator/src/main/java/ResourceScheduler.java
new file mode 100644
index 0000000000..6ff4e578d8
--- /dev/null
+++ 
b/modules/agent-framework/agent-simulator/src/main/java/ResourceScheduler.java
@@ -0,0 +1,29 @@
+import java.util.HashMap;
+import java.util.Map;
+
+public class ResourceScheduler {
+
+    private String clusters[] = {"54.184.248.109"};
+    private String loginUserName = "ubuntu";
+    private String privateKey = "/Users/dwannipurage3/.ssh/cs_rsa_pkce";
+
+    Map<String, SlurmConnector> slurmConnectors = new HashMap<>();
+
+    public ResourceScheduler() {
+        for (String cluster : clusters) {
+            slurmConnectors.put(cluster, new SlurmConnector(cluster, 
loginUserName, privateKey));
+        }
+    }
+
+    public void createBurst(int jobsPerCluster) throws Exception {
+        for (int cluster = 0; cluster < clusters.length; ++cluster) {
+            for (int i = 0; i < jobsPerCluster; i++) {
+                String agentId = "cluster-" + cluster + "-Jobid" + i;
+                SlurmConnector slurmConnector = 
slurmConnectors.get(clusters[cluster]);
+                slurmConnector.submitAgentJob(agentId);
+            }
+        }
+    }
+
+
+}
diff --git 
a/modules/agent-framework/agent-simulator/src/main/java/SlurmConnector.java 
b/modules/agent-framework/agent-simulator/src/main/java/SlurmConnector.java
new file mode 100644
index 0000000000..4f54dee427
--- /dev/null
+++ b/modules/agent-framework/agent-simulator/src/main/java/SlurmConnector.java
@@ -0,0 +1,52 @@
+import net.schmizz.sshj.SSHClient;
+import net.schmizz.sshj.connection.channel.direct.Session;
+import net.schmizz.sshj.transport.verification.PromiscuousVerifier;
+
+import java.io.*;
+
+public class SlurmConnector {
+
+    private String hostName;
+    private String loginUserName;
+    private String privateKeyPath;
+    final SSHClient ssh = new SSHClient();
+
+    public SlurmConnector(String hostName, String loginUserName, String 
privateKeyPath) {
+        this.hostName = hostName;
+        this.loginUserName = loginUserName;
+        this.privateKeyPath = privateKeyPath;
+    }
+
+    private static final Console con = System.console();
+
+    public void connect() throws Exception {
+        ssh.loadKnownHosts();
+        ssh.addHostKeyVerifier(new PromiscuousVerifier());
+        ssh.connect(hostName, 22);
+        ssh.authPublickey(loginUserName, privateKeyPath);
+    }
+
+    public String executeCommand(String command) throws Exception {
+        Session session = ssh.startSession();
+        final Session.Command cmd = session.exec(command);
+        String output = new String(cmd.getInputStream().readAllBytes());
+        session.close();
+        return output;
+    }
+
+    public int submitAgentJob(String agentId) throws Exception {
+        executeCommand("mkdir -p /home/ubuntu/jobs/" + agentId);
+        executeCommand("cp /home/ubuntu/example.slurm /home/ubuntu/jobs/" 
+agentId + "/job.slurm");
+        String commandOut = executeCommand("/opt/slurm/bin/sbatch 
/home/ubuntu/jobs/" + agentId + "/job.slurm");
+        System.out.println(commandOut);
+        return commandOut.startsWith("Submitted batch job ") ? 
Integer.parseInt(commandOut.trim().split(" ")[3]) : -1;
+    }
+
+    public static void main(String[] args) throws Exception {
+        SlurmConnector slurmConnector = new SlurmConnector("54.184.248.109", 
"ubuntu",
+                "/Users/dwannipurage3/.ssh/cs_rsa_pkce");
+        slurmConnector.connect();
+        int jobId = slurmConnector.submitAgentJob("agent1");
+        System.out.println("Job id " + jobId);
+    }
+}
diff --git a/modules/agent-framework/airavata-agent/agent.go 
b/modules/agent-framework/airavata-agent/agent.go
index 99d28c8088..de3c2277d0 100644
--- a/modules/agent-framework/airavata-agent/agent.go
+++ b/modules/agent-framework/airavata-agent/agent.go
@@ -2,7 +2,6 @@ package main
 
 import (
        protos "airavata-agent/protos"
-       "bufio"
        "bytes"
        "context"
        "encoding/json"
@@ -49,58 +48,6 @@ func main() {
                log.Printf("Connected to the server...")
        }
 
-       go func() {
-               log.Printf("Starting jupyter kernel")
-               cmd := exec.Command("python", "/opt/jupyter/kernel.py")
-               //cmd := exec.Command("jupyter/venv/bin/python", 
"jupyter/kernel.py")
-               stdout, err := cmd.StdoutPipe()
-
-               if err != nil {
-                       fmt.Println("[agent.go] Error creating StdoutPipe:", 
err)
-                       return
-               }
-
-               // Get stderr pipe
-               stderr, err := cmd.StderrPipe()
-               if err != nil {
-                       fmt.Println("[agent.go] Error creating StderrPipe:", 
err)
-                       return
-               }
-
-               log.Printf("[agent.go] Starting command for execution")
-               // Start the command
-               if err := cmd.Start(); err != nil {
-                       fmt.Println("[agent.go] Error starting command:", err)
-                       return
-               }
-
-               // Create channels to read from stdout and stderr
-               stdoutScanner := bufio.NewScanner(stdout)
-               stderrScanner := bufio.NewScanner(stderr)
-
-               // Stream stdout
-               go func() {
-                       for stdoutScanner.Scan() {
-                               fmt.Printf("[agent.go] stdout: %s\n", 
stdoutScanner.Text())
-                       }
-               }()
-
-               // Stream stderr
-               go func() {
-                       for stderrScanner.Scan() {
-                               fmt.Printf("[agent.go] stderr: %s\n", 
stderrScanner.Text())
-                       }
-               }()
-
-               // Wait for the command to finish
-               if err := cmd.Wait(); err != nil {
-                       fmt.Println("[agent.go] Error waiting for command:", 
err)
-                       return
-               }
-
-               fmt.Println("[agent.go] Command finished")
-       }()
-
        go func() {
                for {
                        in, err := stream.Recv()
@@ -127,6 +74,9 @@ func main() {
                                log.Printf("[agent.go] Working Dir %s", 
workingDir)
                                log.Printf("[agent.go] Libraries %s", libraries)
 
+                               if code == "kill" {
+                                       log.Fatalf("Killing the agent " + 
agentId + "  as instructed by server")
+                               }
                                // TODO: cd into working dir, create the 
virtual environment with provided libraries
                                cmd := exec.Command("python3", "-c", code) 
//TODO: Load python runtime from a config
 
@@ -139,8 +89,8 @@ func main() {
                                stdoutString := string(output)
                                if err := 
stream.Send(&protos.AgentMessage{Message: 
&protos.AgentMessage_PythonExecutionResponse{
                                        PythonExecutionResponse: 
&protos.PythonExecutionResponse{
-                                               SessionId: sessionId, 
-                                               ExecutionId: executionId, 
+                                               SessionId:      sessionId,
+                                               ExecutionId:    executionId,
                                                ResponseString: 
stdoutString}}}); err != nil {
                                        log.Printf("[agent.go] Failed to send 
execution result to server: %v", err)
                                }
diff --git a/modules/agent-framework/pom.xml b/modules/agent-framework/pom.xml
index 0d19265b9c..6626ce43b4 100644
--- a/modules/agent-framework/pom.xml
+++ b/modules/agent-framework/pom.xml
@@ -14,6 +14,7 @@
     <packaging>pom</packaging>
     <modules>
         <module>agent-service</module>
+        <module>agent-simulator</module>
     </modules>
 
     <properties>

Reply via email to