This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch vizfold in repository https://gitbox.apache.org/repos/asf/airavata.git
commit dc8d9c64cd3e2d96477ac4201e91903bbd75524e Author: lahiruj <[email protected]> AuthorDate: Sun Oct 5 14:14:15 2025 -0400 job workload handling and persisting implementation --- .../connection/service/config/AsyncConfig.java | 40 +++++++ .../service/handlers/AgentManagementHandler.java | 15 ++- .../service/handlers/JobBatchHandler.java | 72 ++++++++++++ .../service/handlers/JobBatchWorker.java | 125 +++++++++++++++++++++ .../service/models/AgentLaunchResponse.java | 12 +- 5 files changed, 258 insertions(+), 6 deletions(-) diff --git a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/config/AsyncConfig.java b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/config/AsyncConfig.java new file mode 100644 index 0000000000..17e3989fde --- /dev/null +++ b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/config/AsyncConfig.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.airavata.agent.connection.service.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +@EnableAsync +@Configuration +public class AsyncConfig { + + @Bean(name = "batchExecutor") + public ThreadPoolTaskExecutor batchExecutor() { + ThreadPoolTaskExecutor ex = new ThreadPoolTaskExecutor(); + ex.setThreadNamePrefix("batch-"); + ex.setCorePoolSize(4); + ex.setMaxPoolSize(8); + ex.setQueueCapacity(1000); + ex.initialize(); + return ex; + } +} diff --git a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentManagementHandler.java b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentManagementHandler.java index 5cdb5a179f..0d1090297b 100644 --- a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentManagementHandler.java +++ b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentManagementHandler.java @@ -54,6 +54,7 @@ public class AgentManagementHandler { private static final Logger LOGGER = LoggerFactory.getLogger(AgentManagementHandler.class); private final AiravataService airavataService; private final ClusterApplicationConfig clusterApplicationConfig; + private final JobBatchHandler jobBatchHandler; @Value("${airavata.storageResourceId}") private String storageResourceId; @@ -64,9 +65,11 @@ public class AgentManagementHandler { @Value("${grpc.server.host}") private String grpcHost; - public AgentManagementHandler(AiravataService airavataService, ClusterApplicationConfig clusterApplicationConfig) { + public AgentManagementHandler(AiravataService airavataService, ClusterApplicationConfig clusterApplicationConfig, + JobBatchHandler jobBatchHandler) { this.airavataService = airavataService; this.clusterApplicationConfig = clusterApplicationConfig; + this.jobBatchHandler = jobBatchHandler; } public AgentTerminateResponse terminateExperiment(String experimentId) { @@ -167,10 +170,12 @@ public class AgentManagementHandler { .airavata() .createExperiment(UserContext.authzToken(), experiment.getGatewayId(), experiment); LOGGER.info("Launching the application, Id: {}, Name: {}", experimentId, experiment.getExperimentName()); - airavataService - .airavata() - .launchExperiment(UserContext.authzToken(), experimentId, experiment.getGatewayId()); - return new AgentLaunchResponse(agentId, experimentId, envName); + + // Handle job workload + String batchId = jobBatchHandler.handleJobWorkload(experimentId, req.getJobBatchSpec()); + + airavataService.airavata().launchExperiment(UserContext.authzToken(), experimentId, experiment.getGatewayId()); + return new AgentLaunchResponse(agentId, experimentId, envName, batchId); } catch (TException e) { LOGGER.error("Error while creating the experiment with the name: {}", req.getExperimentName(), e); throw new RuntimeException( diff --git a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/JobBatchHandler.java b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/JobBatchHandler.java new file mode 100644 index 0000000000..0c8e790b05 --- /dev/null +++ b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/JobBatchHandler.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.airavata.agent.connection.service.handlers; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.airavata.agent.connection.service.db.entity.JobBatchEntity; +import org.apache.airavata.agent.connection.service.db.repo.JobBatchRepo; +import org.apache.airavata.agent.connection.service.models.JobBatchSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import java.util.UUID; + +@Service +public class JobBatchHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(JobBatchHandler.class); + + private final JobBatchWorker jobBatchWorker; + private final JobBatchRepo jobBatchRepo; + private final ObjectMapper objectMapper; + + public JobBatchHandler(JobBatchWorker jobBatchWorker, JobBatchRepo jobBatchRepo, ObjectMapper objectMapper) { + this.jobBatchWorker = jobBatchWorker; + this.jobBatchRepo = jobBatchRepo; + this.objectMapper = objectMapper; + } + + public String handleJobWorkload(String experimentId, JobBatchSpec spec) { + + String batchId = null; + if (spec != null) { + if (spec.getApplicationCommand() == null || spec.getApplicationCommand().isBlank()) { + LOGGER.warn("application_command is required for experiment with id: {}", experimentId); + throw new IllegalArgumentException("application_command is required for experiment with id: " + experimentId); + } + + if (spec.getParameterGrid() == null || spec.getParameterGrid().isEmpty()) { + LOGGER.warn("parameter_grid is required for experiment with id: {}", experimentId); + throw new IllegalArgumentException("parameter_grid is required for experiment with id: " + experimentId); + } + + batchId = UUID.randomUUID().toString(); + JobBatchEntity batch = new JobBatchEntity(); + batch.setId(batchId); + batch.setExperimentId(experimentId); + batch.setCommandTemplate(spec.getApplicationCommand()); + batch.setPayloadJson(objectMapper.valueToTree(spec)); + jobBatchRepo.save(batch); + + jobBatchWorker.expandAndPersistUnitsAsync(experimentId, batchId, spec.getApplicationCommand(), spec.getParameterGrid()); + } + return batchId; + } +} diff --git a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/JobBatchWorker.java b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/JobBatchWorker.java new file mode 100644 index 0000000000..ca8a97f37f --- /dev/null +++ b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/JobBatchWorker.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.airavata.agent.connection.service.handlers; + +import jakarta.transaction.Transactional; +import org.apache.airavata.agent.connection.service.db.entity.JobBatchEntity; +import org.apache.airavata.agent.connection.service.db.entity.JobUnitEntity; +import org.apache.airavata.agent.connection.service.db.repo.JobUnitRepo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +@Service("jobBatchWorker") +public class JobBatchWorker { + + private static final Logger LOGGER = LoggerFactory.getLogger(JobBatchWorker.class); + + private final JobUnitRepo jobUnitRepo; + + public JobBatchWorker(JobUnitRepo jobUnitRepo) { + this.jobUnitRepo = jobUnitRepo; + } + + /** + * Expands the parameter grid and persists JOB_UNIT rows in batch + */ + @Async("batchExecutor") + @Transactional(dontRollbackOn = Exception.class) + public void expandAndPersistUnitsAsync(String experimentId, String batchId, + String commandTemplate, Map<String, List<String>> grid) { + + if (grid == null || grid.isEmpty()) { + return; + } + + List<String> keys = new ArrayList<>(grid.keySet()); + Collections.sort(keys); + + List<String[]> values = new ArrayList<>(keys.size()); + for (String k : keys) { + List<String> vs = grid.get(k); + if (vs == null || vs.isEmpty()) { + LOGGER.warn("Parameter '{}' has empty value list; skipping batch {}", k, batchId); + return; + } + values.add(vs.toArray(new String[0])); + } + + final int chunkSize = 100; + List<JobUnitEntity> buffer = new ArrayList<>(chunkSize); + + int dims = values.size(); + int[] idx = new int[dims]; + boolean done = false; + + while (!done) { + String resolved = renderCommand(commandTemplate, keys, values, idx); + + JobUnitEntity jobUnit = new JobUnitEntity(); + jobUnit.setId(UUID.randomUUID().toString()); + jobUnit.setExperimentId(experimentId); + JobBatchEntity batchRef = new JobBatchEntity(); + batchRef.setId(batchId); + jobUnit.setBatch(batchRef); + jobUnit.setResolvedCommand(resolved); + buffer.add(jobUnit); + + if (buffer.size() >= chunkSize) { + jobUnitRepo.saveAll(buffer); + buffer.clear(); + } + + for (int d = dims - 1; d >= 0; d--) { + idx[d]++; + if (idx[d] < values.get(d).length) { + break; + } else { + idx[d] = 0; + if (d == 0) { + done = true; + } + } + } + } + + if (!buffer.isEmpty()) { + jobUnitRepo.saveAll(buffer); + } + + LOGGER.info("Batch {} expansion complete (experiment {}).", batchId, experimentId); + } + + + private static String renderCommand(String template, List<String> keys, List<String[]> values, int[] idx) { + String cmd = template; + for (int i = 0; i < keys.size(); i++) { + String placeholder = "{" + keys.get(i) + "}"; + cmd = cmd.replace(placeholder, values.get(i)[idx[i]]); + } + return cmd; + } +} diff --git a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/models/AgentLaunchResponse.java b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/models/AgentLaunchResponse.java index d3c6a8077d..ed5907b5da 100644 --- a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/models/AgentLaunchResponse.java +++ b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/models/AgentLaunchResponse.java @@ -24,11 +24,13 @@ public class AgentLaunchResponse { private String experimentId; private String envName; private String processId; + private String batchId; - public AgentLaunchResponse(String agentId, String experimentId, String envName) { + public AgentLaunchResponse(String agentId, String experimentId, String envName, String batchId) { this.agentId = agentId; this.experimentId = experimentId; this.envName = envName; + this.batchId = batchId; } public String getAgentId() { @@ -62,4 +64,12 @@ public class AgentLaunchResponse { public void setProcessId(String processId) { this.processId = processId; } + + public String getBatchId() { + return batchId; + } + + public void setBatchId(String batchId) { + this.batchId = batchId; + } }
