This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch vizfold in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 928bb1b6e119c189f619672f32ae59aa06463f5d Author: lahiruj <[email protected]> AuthorDate: Sun Oct 5 15:52:24 2025 -0400 agent batch job assigning models --- .../init/07-parameter-sweep-modeling.sql | 15 ++++- .../db/entity/AgentBatchAssignmentEntity.java | 76 ++++++++++++++++++++++ .../service/db/repo/AgentBatchAssignmentRepo.java | 29 +++++++++ .../service/handlers/AgentManagementHandler.java | 2 +- .../service/handlers/JobBatchHandler.java | 14 +++- 5 files changed, 132 insertions(+), 4 deletions(-) diff --git a/.devcontainer/database_scripts/init/07-parameter-sweep-modeling.sql b/.devcontainer/database_scripts/init/07-parameter-sweep-modeling.sql index 8bbe2e0372..cfb8173deb 100644 --- a/.devcontainer/database_scripts/init/07-parameter-sweep-modeling.sql +++ b/.devcontainer/database_scripts/init/07-parameter-sweep-modeling.sql @@ -20,7 +20,7 @@ CREATE TABLE IF NOT EXISTS `JOB_UNIT` `EXPERIMENT_ID` VARCHAR(255) NOT NULL, `CREATED_AT` TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), `RESOLVED_COMMAND` TEXT NOT NULL, -- fully expanded command to run - `STATUS` ENUM('PENDING','IN_PROGRESS','COMPLETED','FAILED') NOT NULL DEFAULT 'PENDING', + `STATUS` ENUM ('PENDING','IN_PROGRESS','COMPLETED','FAILED') NOT NULL DEFAULT 'PENDING', `AGENT_ID` VARCHAR(255) NULL, `STARTED_AT` TIMESTAMP(6) NULL, `COMPLETED_AT` TIMESTAMP(6) NULL, @@ -29,4 +29,17 @@ CREATE TABLE IF NOT EXISTS `JOB_UNIT` KEY `IDX_UNIT_EXP_STATUS` (`EXPERIMENT_ID`, `STATUS`), CONSTRAINT `FK_JOB_UNIT_BATCH` FOREIGN KEY (`BATCH_ID`) REFERENCES `JOB_BATCH` (`ID`) ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + +-- Agent Batch Job assignment +CREATE TABLE IF NOT EXISTS `AGENT_BATCH_ASSIGNMENT` +( + `AGENT_ID` VARCHAR(255) NOT NULL, + `EXPERIMENT_ID` VARCHAR(255) NOT NULL, + `BATCH_ID` VARCHAR(255) NOT NULL, + `CREATED_AT` TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + PRIMARY KEY (`AGENT_ID`), + KEY `IDX_ASSIGN_BATCH` (`BATCH_ID`), + CONSTRAINT `FK_ASSIGN_BATCH` FOREIGN KEY (`BATCH_ID`) + REFERENCES `JOB_BATCH` (`ID`) ON DELETE CASCADE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; \ No newline at end of file diff --git a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/db/entity/AgentBatchAssignmentEntity.java b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/db/entity/AgentBatchAssignmentEntity.java new file mode 100644 index 0000000000..7e2a2a2611 --- /dev/null +++ b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/db/entity/AgentBatchAssignmentEntity.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.airavata.agent.connection.service.db.entity; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import jakarta.persistence.Table; + +import java.time.Instant; + +@Entity +@Table(name = "AGENT_BATCH_ASSIGNMENT") +public class AgentBatchAssignmentEntity { + + @Id + @Column(name = "AGENT_ID", nullable = false) + private String agentId; + + @Column(name = "EXPERIMENT_ID", nullable = false) + private String experimentId; + + @Column(name = "BATCH_ID", nullable = false) + private String batchId; + + @Column(name = "CREATED_AT", insertable = false, updatable = false) + private Instant createdAt; + + public String getAgentId() { + return agentId; + } + + public void setAgentId(String agentId) { + this.agentId = agentId; + } + + public String getExperimentId() { + return experimentId; + } + + public void setExperimentId(String experimentId) { + this.experimentId = experimentId; + } + + public String getBatchId() { + return batchId; + } + + public void setBatchId(String batchId) { + this.batchId = batchId; + } + + public Instant getCreatedAt() { + return createdAt; + } + + public void setCreatedAt(Instant createdAt) { + this.createdAt = createdAt; + } +} diff --git a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/db/repo/AgentBatchAssignmentRepo.java b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/db/repo/AgentBatchAssignmentRepo.java new file mode 100644 index 0000000000..5fec2fb706 --- /dev/null +++ b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/db/repo/AgentBatchAssignmentRepo.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.airavata.agent.connection.service.db.repo; + +import org.apache.airavata.agent.connection.service.db.entity.AgentBatchAssignmentEntity; +import org.springframework.data.jpa.repository.JpaRepository; + +import java.util.Optional; + +public interface AgentBatchAssignmentRepo extends JpaRepository<AgentBatchAssignmentEntity, String> { + + Optional<AgentBatchAssignmentEntity> findByAgentId(String agentId); +} diff --git a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentManagementHandler.java b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentManagementHandler.java index 0d1090297b..f6b55beb45 100644 --- a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentManagementHandler.java +++ b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentManagementHandler.java @@ -172,7 +172,7 @@ public class AgentManagementHandler { LOGGER.info("Launching the application, Id: {}, Name: {}", experimentId, experiment.getExperimentName()); // Handle job workload - String batchId = jobBatchHandler.handleJobWorkload(experimentId, req.getJobBatchSpec()); + String batchId = jobBatchHandler.handleJobWorkload(experimentId, agentId, req.getJobBatchSpec()); airavataService.airavata().launchExperiment(UserContext.authzToken(), experimentId, experiment.getGatewayId()); return new AgentLaunchResponse(agentId, experimentId, envName, batchId); diff --git a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/JobBatchHandler.java b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/JobBatchHandler.java index 0c8e790b05..157321dd82 100644 --- a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/JobBatchHandler.java +++ b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/JobBatchHandler.java @@ -19,7 +19,9 @@ package org.apache.airavata.agent.connection.service.handlers; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.airavata.agent.connection.service.db.entity.AgentBatchAssignmentEntity; import org.apache.airavata.agent.connection.service.db.entity.JobBatchEntity; +import org.apache.airavata.agent.connection.service.db.repo.AgentBatchAssignmentRepo; import org.apache.airavata.agent.connection.service.db.repo.JobBatchRepo; import org.apache.airavata.agent.connection.service.models.JobBatchSpec; import org.slf4j.Logger; @@ -35,15 +37,17 @@ public class JobBatchHandler { private final JobBatchWorker jobBatchWorker; private final JobBatchRepo jobBatchRepo; + private final AgentBatchAssignmentRepo agentJobAssignmentRepo; private final ObjectMapper objectMapper; - public JobBatchHandler(JobBatchWorker jobBatchWorker, JobBatchRepo jobBatchRepo, ObjectMapper objectMapper) { + public JobBatchHandler(JobBatchWorker jobBatchWorker, JobBatchRepo jobBatchRepo, AgentBatchAssignmentRepo agentJobAssignmentRepo, ObjectMapper objectMapper) { this.jobBatchWorker = jobBatchWorker; this.jobBatchRepo = jobBatchRepo; + this.agentJobAssignmentRepo = agentJobAssignmentRepo; this.objectMapper = objectMapper; } - public String handleJobWorkload(String experimentId, JobBatchSpec spec) { + public String handleJobWorkload(String experimentId, String agentId, JobBatchSpec spec) { String batchId = null; if (spec != null) { @@ -65,6 +69,12 @@ public class JobBatchHandler { batch.setPayloadJson(objectMapper.valueToTree(spec)); jobBatchRepo.save(batch); + AgentBatchAssignmentEntity assign = new AgentBatchAssignmentEntity(); + assign.setAgentId(agentId); + assign.setExperimentId(experimentId); + assign.setBatchId(batchId); + agentJobAssignmentRepo.save(assign); + jobBatchWorker.expandAndPersistUnitsAsync(experimentId, batchId, spec.getApplicationCommand(), spec.getParameterGrid()); } return batchId;
