This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch hpdc-benchmark in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 638916072fc2747c17ad33c9a1a3deb847a5de15 Author: Dimuthu Wannipurage <[email protected]> AuthorDate: Wed Feb 5 21:20:12 2025 -0500 Temp changes to hpdc benchmark --- .../service/controllers/AgentController.java | 5 ++ .../service/handlers/AgentConnectionHandler.java | 81 +++++++++++++++++++++- .../agent-framework/{ => agent-simulator}/pom.xml | 21 +++--- .../src/main/java/RandomizedIntervalSimulator.java | 59 ++++++++++++++++ .../src/main/java/ResourceScheduler.java | 29 ++++++++ .../src/main/java/SlurmConnector.java | 52 ++++++++++++++ modules/agent-framework/airavata-agent/agent.go | 60 ++-------------- modules/agent-framework/pom.xml | 1 + 8 files changed, 243 insertions(+), 65 deletions(-) diff --git a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java index c0380f7a2a..836d178d96 100644 --- a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java +++ b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/controllers/AgentController.java @@ -82,6 +82,11 @@ public class AgentController { } } + @GetMapping("/runLoad") + public void runLoad() { + agentConnectionHandler.runLoad(); + } + @GetMapping("/executepythonresponse/{executionId}") public ResponseEntity<AgentPythonRunResponse> getPythonResponse(@PathVariable("executionId") String executionId) { return ResponseEntity.accepted().body(agentConnectionHandler.getPythonExecutionResponse(executionId)); diff --git a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentConnectionHandler.java b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentConnectionHandler.java index 5fdb003c89..ec5b6998ca 100644 --- a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentConnectionHandler.java +++ b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentConnectionHandler.java @@ -13,8 +13,10 @@ import org.slf4j.LoggerFactory; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; @GrpcService @@ -117,6 +119,74 @@ public class AgentConnectionHandler extends AgentCommunicationServiceGrpc.AgentC return runResponse; } + private String loadCode = "import time\n" + + "\n" + + "print(\"Waiting for 30 seconds...\")\n" + + "time.sleep(30)\n" + + "print(\"Done waiting!\")"; + + private Map<String, String> BUSY_AGENTS_WITH_EXECUTION = new ConcurrentHashMap<>(); + private Map<String, String> COMPLETED_AGENTS_WITH_EXECUTION = new ConcurrentHashMap<>(); + private int loadSize = 20; + private AtomicInteger jobsTobeSubmitted = new AtomicInteger(0); + private AtomicInteger jobsCompleted = new AtomicInteger(0); + + long startTime = 0; + public void runLoad() { + startTime = System.currentTimeMillis(); + logger.info("Running load on agents"); + jobsCompleted.set(0); + jobsTobeSubmitted.set(loadSize); + runLoadScheduling(); + } + + private synchronized void runLoadScheduling() { + + for (String execId : COMPLETED_AGENTS_WITH_EXECUTION.keySet()) { + String agentId = COMPLETED_AGENTS_WITH_EXECUTION.get(execId); + String streamId = AGENT_STREAM_MAPPING.get(agentId); + if (ACTIVE_STREAMS.containsKey(streamId)) { + logger.info("Sending kill message to agent {}", agentId); + StreamObserver<ServerMessage> streamObserver = ACTIVE_STREAMS.get(streamId); + streamObserver.onNext(ServerMessage.newBuilder().setPythonExecutionRequest( + PythonExecutionRequest.newBuilder() + .setExecutionId(UUID.randomUUID().toString()) + .setKeepAlive(true) + .setCode("kill") + .setWorkingDir("")).build()); + COMPLETED_AGENTS_WITH_EXECUTION.remove(execId); + } + } + + if (jobsTobeSubmitted.get() <= 0) { + logger.info("Loading agents completed"); + return; + } + for (String agentId : AGENT_STREAM_MAPPING.keySet()) { + + if (BUSY_AGENTS_WITH_EXECUTION.containsValue(agentId)) { + logger.info("Busy agent {}", agentId); + continue; + } + + String streamId = AGENT_STREAM_MAPPING.get(agentId); + if (ACTIVE_STREAMS.containsKey(streamId)) { + String executionId = UUID.randomUUID().toString(); + logger.info("Running load on agent {} for execution {}", agentId, executionId); + StreamObserver<ServerMessage> streamObserver = ACTIVE_STREAMS.get(streamId); + BUSY_AGENTS_WITH_EXECUTION.put(executionId, agentId); + streamObserver.onNext(ServerMessage.newBuilder().setPythonExecutionRequest( + PythonExecutionRequest.newBuilder() + .setExecutionId(executionId) + .setKeepAlive(true) + .setCode(loadCode) + .setWorkingDir("")).build()); + jobsTobeSubmitted.decrementAndGet(); + } + } + } + + public AgentPythonRunAck runPythonOnAgent(AgentPythonRunRequest pythonRunRequest) { String executionId = UUID.randomUUID().toString(); AgentPythonRunAck ack = new AgentPythonRunAck(); @@ -218,8 +288,9 @@ public class AgentConnectionHandler extends AgentCommunicationServiceGrpc.AgentC } private void handleAgentPing(AgentPing agentPing, String streamId) { - logger.info("Received agent ping for agent id {}", agentPing.getAgentId()); + logger.info("Received agent ping for agent id {} with stram {}", agentPing.getAgentId(), streamId); AGENT_STREAM_MAPPING.put(agentPing.getAgentId(), streamId); + runLoadScheduling(); } private void handleCommandExecutionResponse (CommandExecutionResponse commandExecutionResponse) { @@ -243,6 +314,14 @@ public class AgentConnectionHandler extends AgentCommunicationServiceGrpc.AgentC private void handlePythonExecutionResponse (PythonExecutionResponse executionResponse) { logger.info("Received python execution response for execution id {}", executionResponse.getExecutionId()); PYTHON_EXECUTION_RESPONSE_CACHE.put(executionResponse.getExecutionId(), executionResponse); + //BUSY_AGENTS_WITH_EXECUTION.remove(executionResponse.getExecutionId()); + COMPLETED_AGENTS_WITH_EXECUTION.put(executionResponse.getExecutionId(), + BUSY_AGENTS_WITH_EXECUTION.get(executionResponse.getExecutionId())); + int completed = jobsCompleted.incrementAndGet(); + if (completed == loadSize) { + logger.info("ALL JOBS " + loadSize +" COMPLETED time " + (System.currentTimeMillis() - startTime)/1000 + "s"); + } + runLoadScheduling(); } diff --git a/modules/agent-framework/pom.xml b/modules/agent-framework/agent-simulator/pom.xml similarity index 55% copy from modules/agent-framework/pom.xml copy to modules/agent-framework/agent-simulator/pom.xml index 0d19265b9c..327d518e67 100644 --- a/modules/agent-framework/pom.xml +++ b/modules/agent-framework/agent-simulator/pom.xml @@ -5,20 +5,23 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.airavata</groupId> - <artifactId>airavata</artifactId> + <artifactId>agent-framework</artifactId> <version>0.21-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> </parent> - <artifactId>agent-framework</artifactId> - <packaging>pom</packaging> - <modules> - <module>agent-service</module> - </modules> + <artifactId>agent-simulator</artifactId> + <dependencies> + <dependency> + <groupId>com.hierynomus</groupId> + <artifactId>sshj</artifactId> + <version>0.38.0</version> + </dependency> + + </dependencies> <properties> - <maven.compiler.source>11</maven.compiler.source> - <maven.compiler.target>11</maven.compiler.target> + <maven.compiler.source>22</maven.compiler.source> + <maven.compiler.target>22</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> diff --git a/modules/agent-framework/agent-simulator/src/main/java/RandomizedIntervalSimulator.java b/modules/agent-framework/agent-simulator/src/main/java/RandomizedIntervalSimulator.java new file mode 100644 index 0000000000..19bf7bc4fd --- /dev/null +++ b/modules/agent-framework/agent-simulator/src/main/java/RandomizedIntervalSimulator.java @@ -0,0 +1,59 @@ +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.Random; +import java.util.UUID; + +public class RandomizedIntervalSimulator { + + private int simulatorDelays[] = {120}; + private String agentPath = "/Users/dwannipurage3/code/airavata/modules/agent-framework/airavata-agent/airavata-agent"; + private String agentServiceHost = "localhost:19900"; + private int totalJobs = 20; + + private void runAgent(String agentId) throws Exception { + ProcessBuilder processBuilder = new ProcessBuilder(agentPath, agentServiceHost, agentId); + + processBuilder.redirectErrorStream(true); + Process process = processBuilder.start(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + System.out.println(line); + } + } + int exitCode = process.waitFor(); + System.out.println("Process exited with code: " + exitCode + " for agent " + agentId); + } + + private void runSimulator(int interval) throws Exception { + + Random random = new Random(); + int randomInterval = random.nextInt(interval + 1); + + new Thread(new Runnable() { + @Override + public void run() { + try { + System.out.println("Agent for interval " + interval + " starting in " + randomInterval); + Thread.sleep(randomInterval * 1000); + runAgent(interval + "-" + UUID.randomUUID().toString()); + } catch (Exception e) { + e.printStackTrace(); + } + } + }).start(); + } + + public static void main(String[] args) throws Exception { + RandomizedIntervalSimulator simulator = new RandomizedIntervalSimulator(); + for (int simulatorDelay : simulator.simulatorDelays) { + for (int i = 0; i < simulator.totalJobs; i ++) { + System.out.println("Simulating job " + i + " of delay " + simulatorDelay); + simulator.runSimulator(simulatorDelay); + } + } + } + + + +} diff --git a/modules/agent-framework/agent-simulator/src/main/java/ResourceScheduler.java b/modules/agent-framework/agent-simulator/src/main/java/ResourceScheduler.java new file mode 100644 index 0000000000..6ff4e578d8 --- /dev/null +++ b/modules/agent-framework/agent-simulator/src/main/java/ResourceScheduler.java @@ -0,0 +1,29 @@ +import java.util.HashMap; +import java.util.Map; + +public class ResourceScheduler { + + private String clusters[] = {"54.184.248.109"}; + private String loginUserName = "ubuntu"; + private String privateKey = "/Users/dwannipurage3/.ssh/cs_rsa_pkce"; + + Map<String, SlurmConnector> slurmConnectors = new HashMap<>(); + + public ResourceScheduler() { + for (String cluster : clusters) { + slurmConnectors.put(cluster, new SlurmConnector(cluster, loginUserName, privateKey)); + } + } + + public void createBurst(int jobsPerCluster) throws Exception { + for (int cluster = 0; cluster < clusters.length; ++cluster) { + for (int i = 0; i < jobsPerCluster; i++) { + String agentId = "cluster-" + cluster + "-Jobid" + i; + SlurmConnector slurmConnector = slurmConnectors.get(clusters[cluster]); + slurmConnector.submitAgentJob(agentId); + } + } + } + + +} diff --git a/modules/agent-framework/agent-simulator/src/main/java/SlurmConnector.java b/modules/agent-framework/agent-simulator/src/main/java/SlurmConnector.java new file mode 100644 index 0000000000..4f54dee427 --- /dev/null +++ b/modules/agent-framework/agent-simulator/src/main/java/SlurmConnector.java @@ -0,0 +1,52 @@ +import net.schmizz.sshj.SSHClient; +import net.schmizz.sshj.connection.channel.direct.Session; +import net.schmizz.sshj.transport.verification.PromiscuousVerifier; + +import java.io.*; + +public class SlurmConnector { + + private String hostName; + private String loginUserName; + private String privateKeyPath; + final SSHClient ssh = new SSHClient(); + + public SlurmConnector(String hostName, String loginUserName, String privateKeyPath) { + this.hostName = hostName; + this.loginUserName = loginUserName; + this.privateKeyPath = privateKeyPath; + } + + private static final Console con = System.console(); + + public void connect() throws Exception { + ssh.loadKnownHosts(); + ssh.addHostKeyVerifier(new PromiscuousVerifier()); + ssh.connect(hostName, 22); + ssh.authPublickey(loginUserName, privateKeyPath); + } + + public String executeCommand(String command) throws Exception { + Session session = ssh.startSession(); + final Session.Command cmd = session.exec(command); + String output = new String(cmd.getInputStream().readAllBytes()); + session.close(); + return output; + } + + public int submitAgentJob(String agentId) throws Exception { + executeCommand("mkdir -p /home/ubuntu/jobs/" + agentId); + executeCommand("cp /home/ubuntu/example.slurm /home/ubuntu/jobs/" +agentId + "/job.slurm"); + String commandOut = executeCommand("/opt/slurm/bin/sbatch /home/ubuntu/jobs/" + agentId + "/job.slurm"); + System.out.println(commandOut); + return commandOut.startsWith("Submitted batch job ") ? Integer.parseInt(commandOut.trim().split(" ")[3]) : -1; + } + + public static void main(String[] args) throws Exception { + SlurmConnector slurmConnector = new SlurmConnector("54.184.248.109", "ubuntu", + "/Users/dwannipurage3/.ssh/cs_rsa_pkce"); + slurmConnector.connect(); + int jobId = slurmConnector.submitAgentJob("agent1"); + System.out.println("Job id " + jobId); + } +} diff --git a/modules/agent-framework/airavata-agent/agent.go b/modules/agent-framework/airavata-agent/agent.go index 99d28c8088..de3c2277d0 100644 --- a/modules/agent-framework/airavata-agent/agent.go +++ b/modules/agent-framework/airavata-agent/agent.go @@ -2,7 +2,6 @@ package main import ( protos "airavata-agent/protos" - "bufio" "bytes" "context" "encoding/json" @@ -49,58 +48,6 @@ func main() { log.Printf("Connected to the server...") } - go func() { - log.Printf("Starting jupyter kernel") - cmd := exec.Command("python", "/opt/jupyter/kernel.py") - //cmd := exec.Command("jupyter/venv/bin/python", "jupyter/kernel.py") - stdout, err := cmd.StdoutPipe() - - if err != nil { - fmt.Println("[agent.go] Error creating StdoutPipe:", err) - return - } - - // Get stderr pipe - stderr, err := cmd.StderrPipe() - if err != nil { - fmt.Println("[agent.go] Error creating StderrPipe:", err) - return - } - - log.Printf("[agent.go] Starting command for execution") - // Start the command - if err := cmd.Start(); err != nil { - fmt.Println("[agent.go] Error starting command:", err) - return - } - - // Create channels to read from stdout and stderr - stdoutScanner := bufio.NewScanner(stdout) - stderrScanner := bufio.NewScanner(stderr) - - // Stream stdout - go func() { - for stdoutScanner.Scan() { - fmt.Printf("[agent.go] stdout: %s\n", stdoutScanner.Text()) - } - }() - - // Stream stderr - go func() { - for stderrScanner.Scan() { - fmt.Printf("[agent.go] stderr: %s\n", stderrScanner.Text()) - } - }() - - // Wait for the command to finish - if err := cmd.Wait(); err != nil { - fmt.Println("[agent.go] Error waiting for command:", err) - return - } - - fmt.Println("[agent.go] Command finished") - }() - go func() { for { in, err := stream.Recv() @@ -127,6 +74,9 @@ func main() { log.Printf("[agent.go] Working Dir %s", workingDir) log.Printf("[agent.go] Libraries %s", libraries) + if code == "kill" { + log.Fatalf("Killing the agent " + agentId + " as instructed by server") + } // TODO: cd into working dir, create the virtual environment with provided libraries cmd := exec.Command("python3", "-c", code) //TODO: Load python runtime from a config @@ -139,8 +89,8 @@ func main() { stdoutString := string(output) if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_PythonExecutionResponse{ PythonExecutionResponse: &protos.PythonExecutionResponse{ - SessionId: sessionId, - ExecutionId: executionId, + SessionId: sessionId, + ExecutionId: executionId, ResponseString: stdoutString}}}); err != nil { log.Printf("[agent.go] Failed to send execution result to server: %v", err) } diff --git a/modules/agent-framework/pom.xml b/modules/agent-framework/pom.xml index 0d19265b9c..6626ce43b4 100644 --- a/modules/agent-framework/pom.xml +++ b/modules/agent-framework/pom.xml @@ -14,6 +14,7 @@ <packaging>pom</packaging> <modules> <module>agent-service</module> + <module>agent-simulator</module> </modules> <properties>
