This is an automated email from the ASF dual-hosted git repository.

lahirujayathilake pushed a commit to branch airavata-aws
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/airavata-aws by this push:
     new 9c900e99a0 refactor Pre/PostWorkflowManager to instantiate tasks via 
TaskFactory
9c900e99a0 is described below

commit 9c900e99a03ecca1bb443f8129243d6abccb23e6
Author: lahiruj <[email protected]>
AuthorDate: Wed Jun 25 01:12:38 2025 -0400

    refactor Pre/PostWorkflowManager to instantiate tasks via TaskFactory
---
 .../airavata/helix/impl/task/HelixTaskFactory.java |  38 +++++
 .../airavata/helix/impl/task/SlurmTaskFactory.java |  83 +++++++++++
 .../airavata/helix/impl/task/TaskContext.java      |  73 +++++----
 .../airavata/helix/impl/task/TaskFactory.java      |  41 +++++
 .../helix/impl/workflow/PostWorkflowManager.java   | 165 +++++++++++----------
 .../helix/impl/workflow/PreWorkflowManager.java    |  78 ++++++----
 6 files changed, 339 insertions(+), 139 deletions(-)

diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/HelixTaskFactory.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/HelixTaskFactory.java
new file mode 100644
index 0000000000..1aff2199bc
--- /dev/null
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/HelixTaskFactory.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task;
+
+public interface HelixTaskFactory {
+
+    AiravataTask createEnvSetupTask(String processId);
+
+    AiravataTask createInputDataStagingTask(String processId);
+
+    AiravataTask createJobSubmissionTask(String processId);
+
+    AiravataTask createOutputDataStagingTask(String processId);
+
+    AiravataTask createArchiveTask(String processId);
+
+    AiravataTask createJobVerificationTask(String processId);
+
+    AiravataTask createCompletingTask(String processId);
+
+    AiravataTask createParsingTriggeringTask(String processId);
+}
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/SlurmTaskFactory.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/SlurmTaskFactory.java
new file mode 100644
index 0000000000..b13a4a276c
--- /dev/null
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/SlurmTaskFactory.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task;
+
+import org.apache.airavata.helix.impl.task.completing.CompletingTask;
+import org.apache.airavata.helix.impl.task.env.EnvSetupTask;
+import org.apache.airavata.helix.impl.task.parsing.ParsingTriggeringTask;
+import org.apache.airavata.helix.impl.task.staging.ArchiveTask;
+import org.apache.airavata.helix.impl.task.staging.InputDataStagingTask;
+import org.apache.airavata.helix.impl.task.staging.JobVerificationTask;
+import org.apache.airavata.helix.impl.task.staging.OutputDataStagingTask;
+import org.apache.airavata.helix.impl.task.submission.DefaultJobSubmissionTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SlurmTaskFactory implements HelixTaskFactory {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(SlurmTaskFactory.class);
+
+    @Override
+    public AiravataTask createEnvSetupTask(String processId) {
+        logger.info("Creating Slurm EnvSetupTask for process {}...", 
processId);
+        return new EnvSetupTask();
+    }
+
+    @Override
+    public AiravataTask createInputDataStagingTask(String processId) {
+        logger.info("Creating Slurm InputDataStagingTask for process {}...", 
processId);
+        return new InputDataStagingTask();
+    }
+
+    @Override
+    public AiravataTask createJobSubmissionTask(String processId) {
+        logger.info("Creating Slurm DefaultJobSubmissionTask for process 
{}...", processId);
+        return new DefaultJobSubmissionTask();
+    }
+
+    @Override
+    public AiravataTask createOutputDataStagingTask(String processId) {
+        logger.info("Creating Slurm OutputDataStagingTask for process {}...", 
processId);
+        return new OutputDataStagingTask();
+    }
+
+    @Override
+    public AiravataTask createArchiveTask(String processId) {
+        logger.info("Creating Slurm ArchiveTask for process {}...", processId);
+        return new ArchiveTask();
+    }
+
+    @Override
+    public AiravataTask createJobVerificationTask(String processId) {
+        logger.info("Creating Slurm JobVerificationTask for process {}...", 
processId);
+        return new JobVerificationTask();
+    }
+
+    @Override
+    public AiravataTask createCompletingTask(String processId) {
+        logger.info("Creating Slurm CompletingTask for process {}...", 
processId);
+        return new CompletingTask();
+    }
+
+    @Override
+    public AiravataTask createParsingTriggeringTask(String processId) {
+        logger.info("Creating Slurm ParsingTriggeringTask for process {}...", 
processId);
+        return new ParsingTriggeringTask();
+    }
+}
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
index 2e18ad5d92..d7892dff71 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
@@ -1,34 +1,23 @@
 /**
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.airavata.helix.impl.task;
 
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.messaging.core.Publisher;
@@ -75,6 +64,17 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
 /**
  * Note: process context property use lazy loading approach. In runtime you 
will see some properties as null
  * unless you have access it previously. Once that property access using the 
api,it will be set to correct value.
@@ -111,6 +111,7 @@ public class TaskContext {
     private UserComputeResourcePreference userComputeResourcePreference;
     private UserStoragePreference userStoragePreference;
     private GroupComputeResourcePreference groupComputeResourcePreference;
+    private ResourceType resourceType;
 
     private ComputeResourceDescription computeResourceDescription;
     private ApplicationDeploymentDescription applicationDeploymentDescription;
@@ -267,6 +268,14 @@ public class TaskContext {
         this.groupComputeResourcePreference = groupComputeResourcePreference;
     }
 
+    public ResourceType getResourceType() throws Exception {
+        if (resourceType == null) {
+            GroupComputeResourcePreference pref = 
getGroupComputeResourcePreference();
+            resourceType = pref.getResourceType();
+        }
+        return resourceType;
+    }
+
     public UserResourceProfile getUserResourceProfile() throws Exception {
 
         if (userResourceProfile == null && processModel.isUseUserCRPref()) {
@@ -400,8 +409,8 @@ public class TaskContext {
                         if (outputDataObjectType.getValue() == null
                                 || outputDataObjectType.getValue().equals("")) 
{
                             String stdOut = 
(getWorkingDir().endsWith(File.separator)
-                                            ? getWorkingDir()
-                                            : getWorkingDir() + File.separator)
+                                    ? getWorkingDir()
+                                    : getWorkingDir() + File.separator)
                                     + 
getApplicationInterfaceDescription().getApplicationName() + ".stdout";
                             outputDataObjectType.setValue(stdOut);
                             stdoutLocation = stdOut;
@@ -429,8 +438,8 @@ public class TaskContext {
                         if (outputDataObjectType.getValue() == null
                                 || outputDataObjectType.getValue().equals("")) 
{
                             String stderrLocation = 
(getWorkingDir().endsWith(File.separator)
-                                            ? getWorkingDir()
-                                            : getWorkingDir() + File.separator)
+                                    ? getWorkingDir()
+                                    : getWorkingDir() + File.separator)
                                     + 
getApplicationInterfaceDescription().getApplicationName() + ".stderr";
                             outputDataObjectType.setValue(stderrLocation);
                             this.stderrLocation = stderrLocation;
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskFactory.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskFactory.java
new file mode 100644
index 0000000000..1cd0ca6549
--- /dev/null
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskFactory.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task;
+
+import org.apache.airavata.model.appcatalog.groupresourceprofile.ResourceType;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+public class TaskFactory {
+
+    private static final Map<ResourceType, HelixTaskFactory> FACTORIES = new 
EnumMap<>(ResourceType.class);
+
+    static {
+        FACTORIES.put(ResourceType.SLURM, new SlurmTaskFactory());
+    }
+
+    public static HelixTaskFactory getFactory(ResourceType type) {
+        HelixTaskFactory factory = FACTORIES.get(type);
+        if (factory == null) {
+            throw new IllegalArgumentException("No TaskFactory for " + type);
+        }
+        return factory;
+    }
+}
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index a34bbacee1..adc5a636de 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -1,39 +1,33 @@
 /**
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.airavata.helix.impl.workflow;
 
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.stream.Collectors;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.helix.core.OutPort;
-import org.apache.airavata.helix.impl.task.*;
-import org.apache.airavata.helix.impl.task.completing.CompletingTask;
-import org.apache.airavata.helix.impl.task.parsing.ParsingTriggeringTask;
-import org.apache.airavata.helix.impl.task.staging.ArchiveTask;
-import org.apache.airavata.helix.impl.task.staging.JobVerificationTask;
-import org.apache.airavata.helix.impl.task.staging.OutputDataStagingTask;
+import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.HelixTaskFactory;
+import org.apache.airavata.helix.impl.task.TaskFactory;
 import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.model.appcatalog.groupresourceprofile.ResourceType;
 import org.apache.airavata.model.experiment.ExperimentModel;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.messaging.event.JobIdentifier;
@@ -53,12 +47,31 @@ import 
org.apache.airavata.monitor.kafka.JobStatusResultDeserializer;
 import org.apache.airavata.patform.monitoring.CountMonitor;
 import org.apache.airavata.patform.monitoring.MonitoringServer;
 import org.apache.airavata.registry.api.RegistryService;
-import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
 public class PostWorkflowManager extends WorkflowManager {
 
     private static final Logger logger = 
LoggerFactory.getLogger(PostWorkflowManager.class);
@@ -221,10 +234,16 @@ public class PostWorkflowManager extends WorkflowManager {
 
         ProcessModel processModel;
         ExperimentModel experimentModel;
+        HelixTaskFactory taskFactory;
         try {
             processModel = registryClient.getProcess(processId);
             experimentModel = 
registryClient.getExperiment(processModel.getExperimentId());
             getRegistryClientPool().returnResource(registryClient);
+            ResourceType resourceType = registryClient
+                    
.getGroupComputeResourcePreference(processModel.getComputeResourceId(), 
processModel.getGroupResourceProfileId())
+                    .getResourceType();
+            taskFactory = TaskFactory.getFactory(resourceType);
+            logger.info("Initialized task factory for resource type {} for 
process {}", resourceType, processId);
 
         } catch (Exception e) {
             logger.error(
@@ -240,7 +259,7 @@ public class PostWorkflowManager extends WorkflowManager {
         String[] taskIds = taskDag.split(",");
         final List<AiravataTask> allTasks = new ArrayList<>();
 
-        JobVerificationTask jobVerificationTask = new JobVerificationTask();
+        AiravataTask jobVerificationTask = 
taskFactory.createJobVerificationTask(processId);
         jobVerificationTask.setGatewayId(experimentModel.getGatewayId());
         jobVerificationTask.setExperimentId(experimentModel.getExperimentId());
         jobVerificationTask.setProcessId(processModel.getProcessId());
@@ -270,11 +289,11 @@ public class PostWorkflowManager extends WorkflowManager {
                         assert subTaskModel != null;
                         switch (subTaskModel.getType()) {
                             case OUPUT:
-                                airavataTask = new OutputDataStagingTask();
+                                airavataTask = 
taskFactory.createOutputDataStagingTask(processId);
                                 airavataTask.setForceRunTask(true);
                                 break;
                             case ARCHIVE_OUTPUT:
-                                airavataTask = new ArchiveTask();
+                                airavataTask = 
taskFactory.createArchiveTask(processId);
                                 airavataTask.setForceRunTask(true);
                                 break;
                         }
@@ -296,7 +315,7 @@ public class PostWorkflowManager extends WorkflowManager {
             }
         }
 
-        CompletingTask completingTask = new CompletingTask();
+        AiravataTask completingTask = 
taskFactory.createCompletingTask(processId);
         completingTask.setGatewayId(experimentModel.getGatewayId());
         completingTask.setExperimentId(experimentModel.getExperimentId());
         completingTask.setProcessId(processModel.getProcessId());
@@ -308,7 +327,7 @@ public class PostWorkflowManager extends WorkflowManager {
         }
         allTasks.add(completingTask);
 
-        ParsingTriggeringTask parsingTriggeringTask = new 
ParsingTriggeringTask();
+        AiravataTask parsingTriggeringTask = 
taskFactory.createParsingTriggeringTask(processId);
         parsingTriggeringTask.setGatewayId(experimentModel.getGatewayId());
         
parsingTriggeringTask.setExperimentId(experimentModel.getExperimentId());
         parsingTriggeringTask.setProcessId(processModel.getProcessId());
@@ -332,50 +351,43 @@ public class PostWorkflowManager extends WorkflowManager {
         init();
         final Consumer<String, JobStatusResult> consumer = createConsumer();
         new Thread(() -> {
-                    while (true) {
-
-                        final ConsumerRecords<String, JobStatusResult> 
consumerRecords = consumer.poll(Long.MAX_VALUE);
-                        CompletionService<Boolean> executorCompletionService =
-                                new 
ExecutorCompletionService<>(processingPool);
-                        List<Future<Boolean>> processingFutures = new 
ArrayList<>();
-
-                        for (TopicPartition partition : 
consumerRecords.partitions()) {
-                            List<ConsumerRecord<String, JobStatusResult>> 
partitionRecords =
-                                    consumerRecords.records(partition);
-                            logger.info("Received job records {}", 
partitionRecords.size());
-
-                            for (ConsumerRecord<String, JobStatusResult> 
record : partitionRecords) {
-                                logger.info(
-                                        "Submitting {} to process in thread 
pool",
-                                        record.value().getJobId());
-
-                                // This avoids kafka read thread to wait until 
processing is completed before committing
-                                // There is a risk of missing 20 messages in 
case of a restart but this improves the
-                                // robustness
-                                // of the kafka read thread by avoiding wait 
timeouts
-                                
processingFutures.add(executorCompletionService.submit(() -> {
-                                    boolean success = process(record.value());
-                                    logger.info("Status of processing "
-                                            + record.value().getJobId() + " : 
" + success);
-                                    return success;
-                                }));
-
-                                consumer.commitSync(Collections.singletonMap(
-                                        partition, new 
OffsetAndMetadata(record.offset() + 1)));
-                            }
-                        }
+            while (true) {
+
+                final ConsumerRecords<String, JobStatusResult> consumerRecords 
= consumer.poll(Long.MAX_VALUE);
+                CompletionService<Boolean> executorCompletionService = new 
ExecutorCompletionService<>(processingPool);
+                List<Future<Boolean>> processingFutures = new ArrayList<>();
+
+                for (TopicPartition partition : consumerRecords.partitions()) {
+                    List<ConsumerRecord<String, JobStatusResult>> 
partitionRecords = consumerRecords.records(partition);
+                    logger.info("Received job records {}", 
partitionRecords.size());
+
+                    for (ConsumerRecord<String, JobStatusResult> record : 
partitionRecords) {
+                        logger.info("Submitting {} to process in thread pool", 
record.value().getJobId());
+
+                        // This avoids kafka read thread to wait until 
processing is completed before committing
+                        // There is a risk of missing 20 messages in case of a 
restart but this improves the
+                        // robustness
+                        // of the kafka read thread by avoiding wait timeouts
+                        
processingFutures.add(executorCompletionService.submit(() -> {
+                            boolean success = process(record.value());
+                            logger.info("Status of processing " + 
record.value().getJobId() + " : " + success);
+                            return success;
+                        }));
+
+                        
consumer.commitSync(Collections.singletonMap(partition, new 
OffsetAndMetadata(record.offset() + 1)));
+                    }
+                }
 
-                        for (Future<Boolean> f : processingFutures) {
-                            try {
-                                executorCompletionService.take().get();
-                            } catch (Exception e) {
-                                logger.error("Failed processing job", e);
-                            }
-                        }
-                        logger.info("All messages processed. Moving to next 
round");
+                for (Future<Boolean> f : processingFutures) {
+                    try {
+                        executorCompletionService.take().get();
+                    } catch (Exception e) {
+                        logger.error("Failed processing job", e);
                     }
-                })
-                .start();
+                }
+                logger.info("All messages processed. Moving to next round");
+            }
+        }).start();
     }
 
     private void saveAndPublishJobStatus(
@@ -419,7 +431,8 @@ public class PostWorkflowManager extends WorkflowManager {
         }
     }
 
-    public void stopServer() {}
+    public void stopServer() {
+    }
 
     public static void main(String[] args) throws Exception {
 
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index 971c5be267..d533bfa2a0 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -1,26 +1,23 @@
 /**
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.airavata.helix.impl.workflow;
 
-import java.util.*;
-import java.util.stream.Collectors;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
@@ -28,17 +25,22 @@ import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.helix.core.AbstractTask;
 import org.apache.airavata.helix.core.OutPort;
 import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.HelixTaskFactory;
+import org.apache.airavata.helix.impl.task.TaskFactory;
 import org.apache.airavata.helix.impl.task.cancel.CancelCompletingTask;
 import org.apache.airavata.helix.impl.task.cancel.RemoteJobCancellationTask;
 import org.apache.airavata.helix.impl.task.cancel.WorkflowCancellationTask;
 import org.apache.airavata.helix.impl.task.completing.CompletingTask;
-import org.apache.airavata.helix.impl.task.env.EnvSetupTask;
-import org.apache.airavata.helix.impl.task.staging.InputDataStagingTask;
-import org.apache.airavata.helix.impl.task.staging.OutputDataStagingTask;
-import org.apache.airavata.helix.impl.task.submission.DefaultJobSubmissionTask;
-import org.apache.airavata.messaging.core.*;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Subscriber;
+import org.apache.airavata.messaging.core.Type;
+import org.apache.airavata.model.appcatalog.groupresourceprofile.ResourceType;
 import org.apache.airavata.model.experiment.ExperimentModel;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
+import org.apache.airavata.model.messaging.event.ProcessTerminateEvent;
 import org.apache.airavata.model.process.ProcessModel;
 import org.apache.airavata.model.process.ProcessWorkflow;
 import org.apache.airavata.model.status.ProcessState;
@@ -53,6 +55,13 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
 public class PreWorkflowManager extends WorkflowManager {
 
     private static final Logger logger = 
LoggerFactory.getLogger(PreWorkflowManager.class);
@@ -71,7 +80,8 @@ public class PreWorkflowManager extends WorkflowManager {
         initLaunchSubscriber();
     }
 
-    public void stopServer() {}
+    public void stopServer() {
+    }
 
     private void initLaunchSubscriber() throws AiravataException {
         List<String> routingKeys = new ArrayList<>();
@@ -87,10 +97,16 @@ public class PreWorkflowManager extends WorkflowManager {
 
         ProcessModel processModel;
         ExperimentModel experimentModel;
+        HelixTaskFactory taskFactory;
         try {
             processModel = registryClient.getProcess(processId);
             experimentModel = 
registryClient.getExperiment(processModel.getExperimentId());
             getRegistryClientPool().returnResource(registryClient);
+            ResourceType resourceType = registryClient
+                    
.getGroupComputeResourcePreference(processModel.getComputeResourceId(), 
processModel.getGroupResourceProfileId())
+                    .getResourceType();
+            taskFactory = TaskFactory.getFactory(resourceType);
+            logger.info("Initialized task factory for resource type {} for 
process {}", resourceType, processId);
 
         } catch (Exception e) {
             logger.error(
@@ -126,21 +142,21 @@ public class PreWorkflowManager extends WorkflowManager {
 
                 if (intermediateTransfer) {
                     if (taskModel.getTaskType() == TaskTypes.OUTPUT_FETCHING) {
-                        airavataTask = new OutputDataStagingTask();
+                        airavataTask = 
taskFactory.createOutputDataStagingTask(processId);
                         airavataTask.setForceRunTask(true);
                         airavataTask.setSkipExperimentStatusPublish(true);
                     }
 
                 } else if (taskModel.getTaskType() == TaskTypes.ENV_SETUP) {
-                    airavataTask = new EnvSetupTask();
+                    airavataTask = taskFactory.createEnvSetupTask(processId);
                     airavataTask.setForceRunTask(true);
                 } else if (taskModel.getTaskType() == 
TaskTypes.JOB_SUBMISSION) {
-                    airavataTask = new DefaultJobSubmissionTask();
+                    airavataTask = 
taskFactory.createJobSubmissionTask(processId);
                     airavataTask.setForceRunTask(forceRun);
                     jobSubmissionFound = true;
                 } else if (taskModel.getTaskType() == TaskTypes.DATA_STAGING) {
                     if (!jobSubmissionFound) {
-                        airavataTask = new InputDataStagingTask();
+                        airavataTask = 
taskFactory.createInputDataStagingTask(processId);
                         airavataTask.setForceRunTask(true);
                     }
                 }


Reply via email to