This is an automated email from the ASF dual-hosted git repository.
lahirujayathilake pushed a commit to branch airavata-aws
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/airavata-aws by this push:
new 9c900e99a0 refactor Pre/PostWorkflowManager to instantiate tasks via
TaskFactory
9c900e99a0 is described below
commit 9c900e99a03ecca1bb443f8129243d6abccb23e6
Author: lahiruj <[email protected]>
AuthorDate: Wed Jun 25 01:12:38 2025 -0400
refactor Pre/PostWorkflowManager to instantiate tasks via TaskFactory
---
.../airavata/helix/impl/task/HelixTaskFactory.java | 38 +++++
.../airavata/helix/impl/task/SlurmTaskFactory.java | 83 +++++++++++
.../airavata/helix/impl/task/TaskContext.java | 73 +++++----
.../airavata/helix/impl/task/TaskFactory.java | 41 +++++
.../helix/impl/workflow/PostWorkflowManager.java | 165 +++++++++++----------
.../helix/impl/workflow/PreWorkflowManager.java | 78 ++++++----
6 files changed, 339 insertions(+), 139 deletions(-)
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/HelixTaskFactory.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/HelixTaskFactory.java
new file mode 100644
index 0000000000..1aff2199bc
--- /dev/null
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/HelixTaskFactory.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task;
+
+public interface HelixTaskFactory {
+
+ AiravataTask createEnvSetupTask(String processId);
+
+ AiravataTask createInputDataStagingTask(String processId);
+
+ AiravataTask createJobSubmissionTask(String processId);
+
+ AiravataTask createOutputDataStagingTask(String processId);
+
+ AiravataTask createArchiveTask(String processId);
+
+ AiravataTask createJobVerificationTask(String processId);
+
+ AiravataTask createCompletingTask(String processId);
+
+ AiravataTask createParsingTriggeringTask(String processId);
+}
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/SlurmTaskFactory.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/SlurmTaskFactory.java
new file mode 100644
index 0000000000..b13a4a276c
--- /dev/null
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/SlurmTaskFactory.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task;
+
+import org.apache.airavata.helix.impl.task.completing.CompletingTask;
+import org.apache.airavata.helix.impl.task.env.EnvSetupTask;
+import org.apache.airavata.helix.impl.task.parsing.ParsingTriggeringTask;
+import org.apache.airavata.helix.impl.task.staging.ArchiveTask;
+import org.apache.airavata.helix.impl.task.staging.InputDataStagingTask;
+import org.apache.airavata.helix.impl.task.staging.JobVerificationTask;
+import org.apache.airavata.helix.impl.task.staging.OutputDataStagingTask;
+import org.apache.airavata.helix.impl.task.submission.DefaultJobSubmissionTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SlurmTaskFactory implements HelixTaskFactory {
+
+ private static final Logger logger =
LoggerFactory.getLogger(SlurmTaskFactory.class);
+
+ @Override
+ public AiravataTask createEnvSetupTask(String processId) {
+ logger.info("Creating Slurm EnvSetupTask for process {}...",
processId);
+ return new EnvSetupTask();
+ }
+
+ @Override
+ public AiravataTask createInputDataStagingTask(String processId) {
+ logger.info("Creating Slurm InputDataStagingTask for process {}...",
processId);
+ return new InputDataStagingTask();
+ }
+
+ @Override
+ public AiravataTask createJobSubmissionTask(String processId) {
+ logger.info("Creating Slurm DefaultJobSubmissionTask for process
{}...", processId);
+ return new DefaultJobSubmissionTask();
+ }
+
+ @Override
+ public AiravataTask createOutputDataStagingTask(String processId) {
+ logger.info("Creating Slurm OutputDataStagingTask for process {}...",
processId);
+ return new OutputDataStagingTask();
+ }
+
+ @Override
+ public AiravataTask createArchiveTask(String processId) {
+ logger.info("Creating Slurm ArchiveTask for process {}...", processId);
+ return new ArchiveTask();
+ }
+
+ @Override
+ public AiravataTask createJobVerificationTask(String processId) {
+ logger.info("Creating Slurm JobVerificationTask for process {}...",
processId);
+ return new JobVerificationTask();
+ }
+
+ @Override
+ public AiravataTask createCompletingTask(String processId) {
+ logger.info("Creating Slurm CompletingTask for process {}...",
processId);
+ return new CompletingTask();
+ }
+
+ @Override
+ public AiravataTask createParsingTriggeringTask(String processId) {
+ logger.info("Creating Slurm ParsingTriggeringTask for process {}...",
processId);
+ return new ParsingTriggeringTask();
+ }
+}
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
index 2e18ad5d92..d7892dff71 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
@@ -1,34 +1,23 @@
/**
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.airavata.helix.impl.task;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.messaging.core.Publisher;
@@ -75,6 +64,17 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
/**
* Note: process context property use lazy loading approach. In runtime you
will see some properties as null
* unless you have access it previously. Once that property access using the
api,it will be set to correct value.
@@ -111,6 +111,7 @@ public class TaskContext {
private UserComputeResourcePreference userComputeResourcePreference;
private UserStoragePreference userStoragePreference;
private GroupComputeResourcePreference groupComputeResourcePreference;
+ private ResourceType resourceType;
private ComputeResourceDescription computeResourceDescription;
private ApplicationDeploymentDescription applicationDeploymentDescription;
@@ -267,6 +268,14 @@ public class TaskContext {
this.groupComputeResourcePreference = groupComputeResourcePreference;
}
+ public ResourceType getResourceType() throws Exception {
+ if (resourceType == null) {
+ GroupComputeResourcePreference pref =
getGroupComputeResourcePreference();
+ resourceType = pref.getResourceType();
+ }
+ return resourceType;
+ }
+
public UserResourceProfile getUserResourceProfile() throws Exception {
if (userResourceProfile == null && processModel.isUseUserCRPref()) {
@@ -400,8 +409,8 @@ public class TaskContext {
if (outputDataObjectType.getValue() == null
|| outputDataObjectType.getValue().equals(""))
{
String stdOut =
(getWorkingDir().endsWith(File.separator)
- ? getWorkingDir()
- : getWorkingDir() + File.separator)
+ ? getWorkingDir()
+ : getWorkingDir() + File.separator)
+
getApplicationInterfaceDescription().getApplicationName() + ".stdout";
outputDataObjectType.setValue(stdOut);
stdoutLocation = stdOut;
@@ -429,8 +438,8 @@ public class TaskContext {
if (outputDataObjectType.getValue() == null
|| outputDataObjectType.getValue().equals(""))
{
String stderrLocation =
(getWorkingDir().endsWith(File.separator)
- ? getWorkingDir()
- : getWorkingDir() + File.separator)
+ ? getWorkingDir()
+ : getWorkingDir() + File.separator)
+
getApplicationInterfaceDescription().getApplicationName() + ".stderr";
outputDataObjectType.setValue(stderrLocation);
this.stderrLocation = stderrLocation;
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskFactory.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskFactory.java
new file mode 100644
index 0000000000..1cd0ca6549
--- /dev/null
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskFactory.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.impl.task;
+
+import org.apache.airavata.model.appcatalog.groupresourceprofile.ResourceType;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+public class TaskFactory {
+
+ private static final Map<ResourceType, HelixTaskFactory> FACTORIES = new
EnumMap<>(ResourceType.class);
+
+ static {
+ FACTORIES.put(ResourceType.SLURM, new SlurmTaskFactory());
+ }
+
+ public static HelixTaskFactory getFactory(ResourceType type) {
+ HelixTaskFactory factory = FACTORIES.get(type);
+ if (factory == null) {
+ throw new IllegalArgumentException("No TaskFactory for " + type);
+ }
+ return factory;
+ }
+}
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index a34bbacee1..adc5a636de 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -1,39 +1,33 @@
/**
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.airavata.helix.impl.workflow;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.stream.Collectors;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.helix.core.OutPort;
-import org.apache.airavata.helix.impl.task.*;
-import org.apache.airavata.helix.impl.task.completing.CompletingTask;
-import org.apache.airavata.helix.impl.task.parsing.ParsingTriggeringTask;
-import org.apache.airavata.helix.impl.task.staging.ArchiveTask;
-import org.apache.airavata.helix.impl.task.staging.JobVerificationTask;
-import org.apache.airavata.helix.impl.task.staging.OutputDataStagingTask;
+import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.HelixTaskFactory;
+import org.apache.airavata.helix.impl.task.TaskFactory;
import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.model.appcatalog.groupresourceprofile.ResourceType;
import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.messaging.event.JobIdentifier;
@@ -53,12 +47,31 @@ import
org.apache.airavata.monitor.kafka.JobStatusResultDeserializer;
import org.apache.airavata.patform.monitoring.CountMonitor;
import org.apache.airavata.patform.monitoring.MonitoringServer;
import org.apache.airavata.registry.api.RegistryService;
-import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
public class PostWorkflowManager extends WorkflowManager {
private static final Logger logger =
LoggerFactory.getLogger(PostWorkflowManager.class);
@@ -221,10 +234,16 @@ public class PostWorkflowManager extends WorkflowManager {
ProcessModel processModel;
ExperimentModel experimentModel;
+ HelixTaskFactory taskFactory;
try {
processModel = registryClient.getProcess(processId);
experimentModel =
registryClient.getExperiment(processModel.getExperimentId());
getRegistryClientPool().returnResource(registryClient);
+ ResourceType resourceType = registryClient
+
.getGroupComputeResourcePreference(processModel.getComputeResourceId(),
processModel.getGroupResourceProfileId())
+ .getResourceType();
+ taskFactory = TaskFactory.getFactory(resourceType);
+ logger.info("Initialized task factory for resource type {} for
process {}", resourceType, processId);
} catch (Exception e) {
logger.error(
@@ -240,7 +259,7 @@ public class PostWorkflowManager extends WorkflowManager {
String[] taskIds = taskDag.split(",");
final List<AiravataTask> allTasks = new ArrayList<>();
- JobVerificationTask jobVerificationTask = new JobVerificationTask();
+ AiravataTask jobVerificationTask =
taskFactory.createJobVerificationTask(processId);
jobVerificationTask.setGatewayId(experimentModel.getGatewayId());
jobVerificationTask.setExperimentId(experimentModel.getExperimentId());
jobVerificationTask.setProcessId(processModel.getProcessId());
@@ -270,11 +289,11 @@ public class PostWorkflowManager extends WorkflowManager {
assert subTaskModel != null;
switch (subTaskModel.getType()) {
case OUPUT:
- airavataTask = new OutputDataStagingTask();
+ airavataTask =
taskFactory.createOutputDataStagingTask(processId);
airavataTask.setForceRunTask(true);
break;
case ARCHIVE_OUTPUT:
- airavataTask = new ArchiveTask();
+ airavataTask =
taskFactory.createArchiveTask(processId);
airavataTask.setForceRunTask(true);
break;
}
@@ -296,7 +315,7 @@ public class PostWorkflowManager extends WorkflowManager {
}
}
- CompletingTask completingTask = new CompletingTask();
+ AiravataTask completingTask =
taskFactory.createCompletingTask(processId);
completingTask.setGatewayId(experimentModel.getGatewayId());
completingTask.setExperimentId(experimentModel.getExperimentId());
completingTask.setProcessId(processModel.getProcessId());
@@ -308,7 +327,7 @@ public class PostWorkflowManager extends WorkflowManager {
}
allTasks.add(completingTask);
- ParsingTriggeringTask parsingTriggeringTask = new
ParsingTriggeringTask();
+ AiravataTask parsingTriggeringTask =
taskFactory.createParsingTriggeringTask(processId);
parsingTriggeringTask.setGatewayId(experimentModel.getGatewayId());
parsingTriggeringTask.setExperimentId(experimentModel.getExperimentId());
parsingTriggeringTask.setProcessId(processModel.getProcessId());
@@ -332,50 +351,43 @@ public class PostWorkflowManager extends WorkflowManager {
init();
final Consumer<String, JobStatusResult> consumer = createConsumer();
new Thread(() -> {
- while (true) {
-
- final ConsumerRecords<String, JobStatusResult>
consumerRecords = consumer.poll(Long.MAX_VALUE);
- CompletionService<Boolean> executorCompletionService =
- new
ExecutorCompletionService<>(processingPool);
- List<Future<Boolean>> processingFutures = new
ArrayList<>();
-
- for (TopicPartition partition :
consumerRecords.partitions()) {
- List<ConsumerRecord<String, JobStatusResult>>
partitionRecords =
- consumerRecords.records(partition);
- logger.info("Received job records {}",
partitionRecords.size());
-
- for (ConsumerRecord<String, JobStatusResult>
record : partitionRecords) {
- logger.info(
- "Submitting {} to process in thread
pool",
- record.value().getJobId());
-
- // This avoids kafka read thread to wait until
processing is completed before committing
- // There is a risk of missing 20 messages in
case of a restart but this improves the
- // robustness
- // of the kafka read thread by avoiding wait
timeouts
-
processingFutures.add(executorCompletionService.submit(() -> {
- boolean success = process(record.value());
- logger.info("Status of processing "
- + record.value().getJobId() + " :
" + success);
- return success;
- }));
-
- consumer.commitSync(Collections.singletonMap(
- partition, new
OffsetAndMetadata(record.offset() + 1)));
- }
- }
+ while (true) {
+
+ final ConsumerRecords<String, JobStatusResult> consumerRecords
= consumer.poll(Long.MAX_VALUE);
+ CompletionService<Boolean> executorCompletionService = new
ExecutorCompletionService<>(processingPool);
+ List<Future<Boolean>> processingFutures = new ArrayList<>();
+
+ for (TopicPartition partition : consumerRecords.partitions()) {
+ List<ConsumerRecord<String, JobStatusResult>>
partitionRecords = consumerRecords.records(partition);
+ logger.info("Received job records {}",
partitionRecords.size());
+
+ for (ConsumerRecord<String, JobStatusResult> record :
partitionRecords) {
+ logger.info("Submitting {} to process in thread pool",
record.value().getJobId());
+
+ // This avoids kafka read thread to wait until
processing is completed before committing
+ // There is a risk of missing 20 messages in case of a
restart but this improves the
+ // robustness
+ // of the kafka read thread by avoiding wait timeouts
+
processingFutures.add(executorCompletionService.submit(() -> {
+ boolean success = process(record.value());
+ logger.info("Status of processing " +
record.value().getJobId() + " : " + success);
+ return success;
+ }));
+
+
consumer.commitSync(Collections.singletonMap(partition, new
OffsetAndMetadata(record.offset() + 1)));
+ }
+ }
- for (Future<Boolean> f : processingFutures) {
- try {
- executorCompletionService.take().get();
- } catch (Exception e) {
- logger.error("Failed processing job", e);
- }
- }
- logger.info("All messages processed. Moving to next
round");
+ for (Future<Boolean> f : processingFutures) {
+ try {
+ executorCompletionService.take().get();
+ } catch (Exception e) {
+ logger.error("Failed processing job", e);
}
- })
- .start();
+ }
+ logger.info("All messages processed. Moving to next round");
+ }
+ }).start();
}
private void saveAndPublishJobStatus(
@@ -419,7 +431,8 @@ public class PostWorkflowManager extends WorkflowManager {
}
}
- public void stopServer() {}
+ public void stopServer() {
+ }
public static void main(String[] args) throws Exception {
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index 971c5be267..d533bfa2a0 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -1,26 +1,23 @@
/**
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.airavata.helix.impl.workflow;
-import java.util.*;
-import java.util.stream.Collectors;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
@@ -28,17 +25,22 @@ import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.helix.core.AbstractTask;
import org.apache.airavata.helix.core.OutPort;
import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.HelixTaskFactory;
+import org.apache.airavata.helix.impl.task.TaskFactory;
import org.apache.airavata.helix.impl.task.cancel.CancelCompletingTask;
import org.apache.airavata.helix.impl.task.cancel.RemoteJobCancellationTask;
import org.apache.airavata.helix.impl.task.cancel.WorkflowCancellationTask;
import org.apache.airavata.helix.impl.task.completing.CompletingTask;
-import org.apache.airavata.helix.impl.task.env.EnvSetupTask;
-import org.apache.airavata.helix.impl.task.staging.InputDataStagingTask;
-import org.apache.airavata.helix.impl.task.staging.OutputDataStagingTask;
-import org.apache.airavata.helix.impl.task.submission.DefaultJobSubmissionTask;
-import org.apache.airavata.messaging.core.*;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Subscriber;
+import org.apache.airavata.messaging.core.Type;
+import org.apache.airavata.model.appcatalog.groupresourceprofile.ResourceType;
import org.apache.airavata.model.experiment.ExperimentModel;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
+import org.apache.airavata.model.messaging.event.ProcessTerminateEvent;
import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.model.process.ProcessWorkflow;
import org.apache.airavata.model.status.ProcessState;
@@ -53,6 +55,13 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
public class PreWorkflowManager extends WorkflowManager {
private static final Logger logger =
LoggerFactory.getLogger(PreWorkflowManager.class);
@@ -71,7 +80,8 @@ public class PreWorkflowManager extends WorkflowManager {
initLaunchSubscriber();
}
- public void stopServer() {}
+ public void stopServer() {
+ }
private void initLaunchSubscriber() throws AiravataException {
List<String> routingKeys = new ArrayList<>();
@@ -87,10 +97,16 @@ public class PreWorkflowManager extends WorkflowManager {
ProcessModel processModel;
ExperimentModel experimentModel;
+ HelixTaskFactory taskFactory;
try {
processModel = registryClient.getProcess(processId);
experimentModel =
registryClient.getExperiment(processModel.getExperimentId());
getRegistryClientPool().returnResource(registryClient);
+ ResourceType resourceType = registryClient
+
.getGroupComputeResourcePreference(processModel.getComputeResourceId(),
processModel.getGroupResourceProfileId())
+ .getResourceType();
+ taskFactory = TaskFactory.getFactory(resourceType);
+ logger.info("Initialized task factory for resource type {} for
process {}", resourceType, processId);
} catch (Exception e) {
logger.error(
@@ -126,21 +142,21 @@ public class PreWorkflowManager extends WorkflowManager {
if (intermediateTransfer) {
if (taskModel.getTaskType() == TaskTypes.OUTPUT_FETCHING) {
- airavataTask = new OutputDataStagingTask();
+ airavataTask =
taskFactory.createOutputDataStagingTask(processId);
airavataTask.setForceRunTask(true);
airavataTask.setSkipExperimentStatusPublish(true);
}
} else if (taskModel.getTaskType() == TaskTypes.ENV_SETUP) {
- airavataTask = new EnvSetupTask();
+ airavataTask = taskFactory.createEnvSetupTask(processId);
airavataTask.setForceRunTask(true);
} else if (taskModel.getTaskType() ==
TaskTypes.JOB_SUBMISSION) {
- airavataTask = new DefaultJobSubmissionTask();
+ airavataTask =
taskFactory.createJobSubmissionTask(processId);
airavataTask.setForceRunTask(forceRun);
jobSubmissionFound = true;
} else if (taskModel.getTaskType() == TaskTypes.DATA_STAGING) {
if (!jobSubmissionFound) {
- airavataTask = new InputDataStagingTask();
+ airavataTask =
taskFactory.createInputDataStagingTask(processId);
airavataTask.setForceRunTask(true);
}
}