This is an automated email from the ASF dual-hosted git repository.

lahirujayathilake pushed a commit to branch vizfold
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit dc8d9c64cd3e2d96477ac4201e91903bbd75524e
Author: lahiruj <[email protected]>
AuthorDate: Sun Oct 5 14:14:15 2025 -0400

    job workload handling and persisting implementation
---
 .../connection/service/config/AsyncConfig.java     |  40 +++++++
 .../service/handlers/AgentManagementHandler.java   |  15 ++-
 .../service/handlers/JobBatchHandler.java          |  72 ++++++++++++
 .../service/handlers/JobBatchWorker.java           | 125 +++++++++++++++++++++
 .../service/models/AgentLaunchResponse.java        |  12 +-
 5 files changed, 258 insertions(+), 6 deletions(-)

diff --git 
a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/config/AsyncConfig.java
 
b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/config/AsyncConfig.java
new file mode 100644
index 0000000000..17e3989fde
--- /dev/null
+++ 
b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/config/AsyncConfig.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.agent.connection.service.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+@EnableAsync
+@Configuration
+public class AsyncConfig {
+
+    @Bean(name = "batchExecutor")
+    public ThreadPoolTaskExecutor batchExecutor() {
+        ThreadPoolTaskExecutor ex = new ThreadPoolTaskExecutor();
+        ex.setThreadNamePrefix("batch-");
+        ex.setCorePoolSize(4);
+        ex.setMaxPoolSize(8);
+        ex.setQueueCapacity(1000);
+        ex.initialize();
+        return ex;
+    }
+}
diff --git 
a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentManagementHandler.java
 
b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentManagementHandler.java
index 5cdb5a179f..0d1090297b 100644
--- 
a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentManagementHandler.java
+++ 
b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/AgentManagementHandler.java
@@ -54,6 +54,7 @@ public class AgentManagementHandler {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AgentManagementHandler.class);
     private final AiravataService airavataService;
     private final ClusterApplicationConfig clusterApplicationConfig;
+    private final JobBatchHandler jobBatchHandler;
 
     @Value("${airavata.storageResourceId}")
     private String storageResourceId;
@@ -64,9 +65,11 @@ public class AgentManagementHandler {
     @Value("${grpc.server.host}")
     private String grpcHost;
 
-    public AgentManagementHandler(AiravataService airavataService, 
ClusterApplicationConfig clusterApplicationConfig) {
+    public AgentManagementHandler(AiravataService airavataService, 
ClusterApplicationConfig clusterApplicationConfig,
+                                  JobBatchHandler jobBatchHandler) {
         this.airavataService = airavataService;
         this.clusterApplicationConfig = clusterApplicationConfig;
+        this.jobBatchHandler = jobBatchHandler;
     }
 
     public AgentTerminateResponse terminateExperiment(String experimentId) {
@@ -167,10 +170,12 @@ public class AgentManagementHandler {
                     .airavata()
                     .createExperiment(UserContext.authzToken(), 
experiment.getGatewayId(), experiment);
             LOGGER.info("Launching the application, Id: {}, Name: {}", 
experimentId, experiment.getExperimentName());
-            airavataService
-                    .airavata()
-                    .launchExperiment(UserContext.authzToken(), experimentId, 
experiment.getGatewayId());
-            return new AgentLaunchResponse(agentId, experimentId, envName);
+
+            // Handle job workload
+            String batchId = jobBatchHandler.handleJobWorkload(experimentId, 
req.getJobBatchSpec());
+
+            
airavataService.airavata().launchExperiment(UserContext.authzToken(), 
experimentId, experiment.getGatewayId());
+            return new AgentLaunchResponse(agentId, experimentId, envName, 
batchId);
         } catch (TException e) {
             LOGGER.error("Error while creating the experiment with the name: 
{}", req.getExperimentName(), e);
             throw new RuntimeException(
diff --git 
a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/JobBatchHandler.java
 
b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/JobBatchHandler.java
new file mode 100644
index 0000000000..0c8e790b05
--- /dev/null
+++ 
b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/JobBatchHandler.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.agent.connection.service.handlers;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.airavata.agent.connection.service.db.entity.JobBatchEntity;
+import org.apache.airavata.agent.connection.service.db.repo.JobBatchRepo;
+import org.apache.airavata.agent.connection.service.models.JobBatchSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.util.UUID;
+
+@Service
+public class JobBatchHandler {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JobBatchHandler.class);
+
+    private final JobBatchWorker jobBatchWorker;
+    private final JobBatchRepo jobBatchRepo;
+    private final ObjectMapper objectMapper;
+
+    public JobBatchHandler(JobBatchWorker jobBatchWorker, JobBatchRepo 
jobBatchRepo, ObjectMapper objectMapper) {
+        this.jobBatchWorker = jobBatchWorker;
+        this.jobBatchRepo = jobBatchRepo;
+        this.objectMapper = objectMapper;
+    }
+
+    public String handleJobWorkload(String experimentId, JobBatchSpec spec) {
+
+        String batchId = null;
+        if (spec != null) {
+            if (spec.getApplicationCommand() == null || 
spec.getApplicationCommand().isBlank()) {
+                LOGGER.warn("application_command is required for experiment 
with id: {}", experimentId);
+                throw new IllegalArgumentException("application_command is 
required for experiment with id: " + experimentId);
+            }
+
+            if (spec.getParameterGrid() == null || 
spec.getParameterGrid().isEmpty()) {
+                LOGGER.warn("parameter_grid is required for experiment with 
id: {}", experimentId);
+                throw new IllegalArgumentException("parameter_grid is required 
for experiment with id: " + experimentId);
+            }
+
+            batchId = UUID.randomUUID().toString();
+            JobBatchEntity batch = new JobBatchEntity();
+            batch.setId(batchId);
+            batch.setExperimentId(experimentId);
+            batch.setCommandTemplate(spec.getApplicationCommand());
+            batch.setPayloadJson(objectMapper.valueToTree(spec));
+            jobBatchRepo.save(batch);
+
+            jobBatchWorker.expandAndPersistUnitsAsync(experimentId, batchId, 
spec.getApplicationCommand(), spec.getParameterGrid());
+        }
+        return batchId;
+    }
+}
diff --git 
a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/JobBatchWorker.java
 
b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/JobBatchWorker.java
new file mode 100644
index 0000000000..ca8a97f37f
--- /dev/null
+++ 
b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/handlers/JobBatchWorker.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.agent.connection.service.handlers;
+
+import jakarta.transaction.Transactional;
+import org.apache.airavata.agent.connection.service.db.entity.JobBatchEntity;
+import org.apache.airavata.agent.connection.service.db.entity.JobUnitEntity;
+import org.apache.airavata.agent.connection.service.db.repo.JobUnitRepo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@Service("jobBatchWorker")
+public class JobBatchWorker {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JobBatchWorker.class);
+
+    private final JobUnitRepo jobUnitRepo;
+
+    public JobBatchWorker(JobUnitRepo jobUnitRepo) {
+        this.jobUnitRepo = jobUnitRepo;
+    }
+
+    /**
+     * Expands the parameter grid and persists JOB_UNIT rows in batch
+     */
+    @Async("batchExecutor")
+    @Transactional(dontRollbackOn = Exception.class)
+    public void expandAndPersistUnitsAsync(String experimentId, String batchId,
+                                           String commandTemplate, Map<String, 
List<String>> grid) {
+
+        if (grid == null || grid.isEmpty()) {
+            return;
+        }
+
+        List<String> keys = new ArrayList<>(grid.keySet());
+        Collections.sort(keys);
+
+        List<String[]> values = new ArrayList<>(keys.size());
+        for (String k : keys) {
+            List<String> vs = grid.get(k);
+            if (vs == null || vs.isEmpty()) {
+                LOGGER.warn("Parameter '{}' has empty value list; skipping 
batch {}", k, batchId);
+                return;
+            }
+            values.add(vs.toArray(new String[0]));
+        }
+
+        final int chunkSize = 100;
+        List<JobUnitEntity> buffer = new ArrayList<>(chunkSize);
+
+        int dims = values.size();
+        int[] idx = new int[dims];
+        boolean done = false;
+
+        while (!done) {
+            String resolved = renderCommand(commandTemplate, keys, values, 
idx);
+
+            JobUnitEntity jobUnit = new JobUnitEntity();
+            jobUnit.setId(UUID.randomUUID().toString());
+            jobUnit.setExperimentId(experimentId);
+            JobBatchEntity batchRef = new JobBatchEntity();
+            batchRef.setId(batchId);
+            jobUnit.setBatch(batchRef);
+            jobUnit.setResolvedCommand(resolved);
+            buffer.add(jobUnit);
+
+            if (buffer.size() >= chunkSize) {
+                jobUnitRepo.saveAll(buffer);
+                buffer.clear();
+            }
+
+            for (int d = dims - 1; d >= 0; d--) {
+                idx[d]++;
+                if (idx[d] < values.get(d).length) {
+                    break;
+                } else {
+                    idx[d] = 0;
+                    if (d == 0) {
+                        done = true;
+                    }
+                }
+            }
+        }
+
+        if (!buffer.isEmpty()) {
+            jobUnitRepo.saveAll(buffer);
+        }
+
+        LOGGER.info("Batch {} expansion complete (experiment {}).", batchId, 
experimentId);
+    }
+
+
+    private static String renderCommand(String template, List<String> keys, 
List<String[]> values, int[] idx) {
+        String cmd = template;
+        for (int i = 0; i < keys.size(); i++) {
+            String placeholder = "{" + keys.get(i) + "}";
+            cmd = cmd.replace(placeholder, values.get(i)[idx[i]]);
+        }
+        return cmd;
+    }
+}
diff --git 
a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/models/AgentLaunchResponse.java
 
b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/models/AgentLaunchResponse.java
index d3c6a8077d..ed5907b5da 100644
--- 
a/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/models/AgentLaunchResponse.java
+++ 
b/modules/agent-framework/agent-service/src/main/java/org/apache/airavata/agent/connection/service/models/AgentLaunchResponse.java
@@ -24,11 +24,13 @@ public class AgentLaunchResponse {
     private String experimentId;
     private String envName;
     private String processId;
+    private String batchId;
 
-    public AgentLaunchResponse(String agentId, String experimentId, String 
envName) {
+    public AgentLaunchResponse(String agentId, String experimentId, String 
envName, String batchId) {
         this.agentId = agentId;
         this.experimentId = experimentId;
         this.envName = envName;
+        this.batchId = batchId;
     }
 
     public String getAgentId() {
@@ -62,4 +64,12 @@ public class AgentLaunchResponse {
     public void setProcessId(String processId) {
         this.processId = processId;
     }
+
+    public String getBatchId() {
+        return batchId;
+    }
+
+    public void setBatchId(String batchId) {
+        this.batchId = batchId;
+    }
 }

Reply via email to