Fixed all compilation issues of workflow-core module
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/ce795581 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/ce795581 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/ce795581 Branch: refs/heads/master Commit: ce7955813f79812a58bc6395914f3a978b0ae1e9 Parents: e21fae7 Author: Shameera Rathnayaka <[email protected]> Authored: Fri Feb 5 16:29:41 2016 -0500 Committer: Shameera Rathnayaka <[email protected]> Committed: Fri Feb 5 16:29:41 2016 -0500 ---------------------------------------------------------------------- .../cpi/impl/SimpleOrchestratorImpl.java | 3 +- .../server/OrchestratorServerHandler.java | 75 +++---- modules/workflow/workflow-core/pom.xml | 6 + .../core/SimpleWorkflowInterpreter.java | 197 ++++++------------- .../airavata/workflow/core/WorkflowContext.java | 60 ------ .../workflow/core/WorkflowEnactmentService.java | 2 +- .../airavata/workflow/core/WorkflowFactory.java | 1 - pom.xml | 1 + 8 files changed, 105 insertions(+), 240 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java index ff515ef..c875180 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java @@ -269,11 +269,10 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ return processModels; } - public String createAndSaveTasks(String gatewayId, ExperimentModel experimentModel, ProcessModel processModel) throws OrchestratorException { + public String createAndSaveTasks(String gatewayId, ProcessModel processModel, boolean autoSchedule) throws OrchestratorException { try { ExperimentCatalog experimentCatalog = orchestratorContext.getRegistry().getExperimentCatalog(); AppCatalog appCatalog = orchestratorContext.getRegistry().getAppCatalog(); - boolean autoSchedule = experimentModel.getUserConfigurationData().isAiravataAutoSchedule(); ComputationalResourceSchedulingModel resourceSchedule = processModel.getResourceSchedule(); String userGivenQueueName = resourceSchedule.getQueueName(); int userGivenWallTime = resourceSchedule.getWallTimeLimit(); http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index a461ba4..977191e 100644 --- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -136,45 +136,46 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { ExperimentModel experiment = null; try { - List<ProcessModel> processes = orchestrator.createProcesses(experimentId, gatewayId); - experiment = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId); - for (ProcessModel processModel : processes){ - String taskDag = orchestrator.createAndSaveTasks(gatewayId, experiment, processModel); - processModel.setTaskDag(taskDag); - experimentCatalog.update(ExperimentCatalogModelType.PROCESS,processModel, processModel.getProcessId()); - } - if (experiment == null) { - log.error(experimentId, "Error retrieving the Experiment by the given experimentID: {} ", experimentId); - return false; - } + String experimentNodePath = GFacUtils.getExperimentNodePath (experimentId); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentNodePath); + String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentCancelNode); - if (!validateProcess(experimentId, processes)) { - log.error("Validating process fails for given experiment Id : {}", experimentId); - return false; - } - ComputeResourcePreference computeResourcePreference = appCatalog.getGatewayProfile(). + ComputeResourcePreference computeResourcePreference = appCatalog.getGatewayProfile(). getComputeResourcePreference(gatewayId, experiment.getUserConfigurationData().getComputationalResourceScheduling().getResourceHostId()); - String token = computeResourcePreference.getResourceSpecificCredentialStoreToken(); - if (token == null || token.isEmpty()){ - // try with gateway profile level token - GatewayResourceProfile gatewayProfile = appCatalog.getGatewayProfile().getGatewayProfile(gatewayId); - token = gatewayProfile.getCredentialStoreToken(); - } - // still the token is empty, then we fail the experiment - if (token == null || token.isEmpty()){ - log.error("You have not configured credential store token at gateway profile or compute resource preference. Please provide the correct token at gateway profile or compute resource preference."); - return false; - } - String experimentNodePath = GFacUtils.getExperimentNodePath (experimentId); - ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentNodePath); - String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); - ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentCancelNode); + String token = computeResourcePreference.getResourceSpecificCredentialStoreToken(); + if (token == null || token.isEmpty()){ + // try with gateway profile level token + GatewayResourceProfile gatewayProfile = appCatalog.getGatewayProfile().getGatewayProfile(gatewayId); + token = gatewayProfile.getCredentialStoreToken(); + } + // still the token is empty, then we fail the experiment + if (token == null || token.isEmpty()){ + log.error("You have not configured credential store token at gateway profile or compute resource preference. Please provide the correct token at gateway profile or compute resource preference."); + return false; + } + ExperimentType executionType = experiment.getExperimentType(); + if (executionType == ExperimentType.SINGLE_APPLICATION) { + //its an single application execution experiment + List<ProcessModel> processes = orchestrator.createProcesses(experimentId, gatewayId); + experiment = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId); + if (experiment == null) { + log.error(experimentId, "Error retrieving the Experiment by the given experimentID: {} ", experimentId); + return false; + } + for (ProcessModel processModel : processes){ + String taskDag = orchestrator.createAndSaveTasks(gatewayId, processModel, experiment.getUserConfigurationData().isAiravataAutoSchedule()); + processModel.setTaskDag(taskDag); + experimentCatalog.update(ExperimentCatalogModelType.PROCESS,processModel, processModel.getProcessId()); + } + + if (!validateProcess(experimentId, processes)) { + log.error("Validating process fails for given experiment Id : {}", experimentId); + return false; + } - ExperimentType executionType = experiment.getExperimentType(); - if (executionType == ExperimentType.SINGLE_APPLICATION) { - //its an single application execution experiment - log.debug(experimentId, "Launching single application experiment {}.", experimentId); + log.debug(experimentId, "Launching single application experiment {}.", experimentId); ExperimentStatus status = new ExperimentStatus(ExperimentState.LAUNCHED); status.setReason("submitted all processes"); status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); @@ -184,7 +185,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { } else if (executionType == ExperimentType.WORKFLOW) { //its a workflow execution experiment log.debug(experimentId, "Launching workflow experiment {}.", experimentId); - launchWorkflowExperiment(experimentId, token); + launchWorkflowExperiment(experimentId, token, gatewayId); } else { log.error(experimentId, "Couldn't identify experiment type, experiment {} is neither single application nor workflow.", experimentId); throw new TException("Experiment '" + experimentId + "' launch failed. Unable to figureout execution type for application " + experiment.getExecutionId()); @@ -367,7 +368,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { } } - private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken) throws TException { + private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken, String gatewayId) throws TException { // FIXME // try { // WorkflowEnactmentService.getInstance(). http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/workflow/workflow-core/pom.xml ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/pom.xml b/modules/workflow/workflow-core/pom.xml index bb1ea79..72990c8 100644 --- a/modules/workflow/workflow-core/pom.xml +++ b/modules/workflow/workflow-core/pom.xml @@ -31,6 +31,12 @@ <version>${project.version}</version> </dependency> + <!--Workflow Interpreter dependency--> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata-gfac-core</artifactId> + <version>${project.version}</version> + </dependency> <!-- Airavata default parser dependency --> <dependency> <groupId>org.apache.airavata</groupId> http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java index 7f8a8a5..cdbf2f2 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java @@ -22,35 +22,25 @@ package org.apache.airavata.workflow.core; import org.apache.airavata.common.exception.AiravataException; -import org.apache.airavata.common.utils.AiravataUtils; -import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher; import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; +import org.apache.airavata.model.ComponentState; +import org.apache.airavata.model.ComponentStatus; import org.apache.airavata.model.application.io.OutputDataObjectType; import org.apache.airavata.model.experiment.ExperimentModel; -import org.apache.airavata.model.messaging.event.*; +import org.apache.airavata.model.messaging.event.ProcessIdentifier; +import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent; +import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent; import org.apache.airavata.model.status.ProcessState; -import org.apache.airavata.model.util.ExperimentModelUtil; import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; -import org.apache.airavata.registry.core.experiment.catalog.model.Experiment; import org.apache.airavata.registry.cpi.*; import org.apache.airavata.workflow.core.dag.edge.Edge; -import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode; -import org.apache.airavata.workflow.core.dag.nodes.NodeState; -import org.apache.airavata.workflow.core.dag.nodes.InputNode; -import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode; -import org.apache.airavata.workflow.core.dag.nodes.OutputNode; -import org.apache.airavata.workflow.core.dag.port.InPort; +import org.apache.airavata.workflow.core.dag.nodes.*; import org.apache.airavata.workflow.core.dag.port.OutPort; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** @@ -68,12 +58,12 @@ class SimpleWorkflowInterpreter{ private String gatewayName; private String workflowString; - private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<String, WorkflowNode>(); - private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>(); - private Map<String, WorkflowContext> processingQueue = new ConcurrentHashMap<String, WorkflowContext>(); - private Map<String, WorkflowContext> completeList = new HashMap<String, WorkflowContext>(); + private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<>(); + private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<>(); + private Map<String, WorkflowNode> processingQueue = new ConcurrentHashMap<>(); + private Map<String, WorkflowNode> completeList = new HashMap<>(); private Registry registry; - private List<OutputNode> completeWorkflowOutputs = new ArrayList<OutputNode>(); + private List<OutputNode> completeWorkflowOutputs = new ArrayList<>(); private RabbitMQProcessLaunchPublisher publisher; private RabbitMQStatusConsumer statusConsumer; private String consumerId; @@ -120,10 +110,11 @@ class SimpleWorkflowInterpreter{ processReadyList(); } - private String getWorkflow() throws AppCatalogException { + private String getWorkflow() throws AppCatalogException, WorkflowCatalogException { WorkflowCatalog workflowCatalog = RegistryFactory.getAppCatalog().getWorkflowCatalog(); //FIXME: parse workflowTemplateId or experimentId - workflowCatalog.getWorkflow(""); +// workflowCatalog.getWorkflow(""); + return ""; } // try to remove synchronization tag @@ -141,15 +132,16 @@ class SimpleWorkflowInterpreter{ OutputNode outputNode = (OutputNode) readyNode; outputNode.getOutputObject().setValue(outputNode.getInPort().getInputObject().getValue()); addToCompleteOutputNodeList(outputNode); - continue; + } else if (readyNode instanceof InputNode) { + // set input object of applications and add applications to ready List. + } else if (readyNode instanceof ApplicationNode) { + // call orchestrator to create process for the application + } else { + throw new RuntimeException("Unsupported workflow node type"); } - WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode); - TaskDetails process = getProcess(workflowNodeDetails); - WorkflowContext workflowContext = new WorkflowContext(readyNode, workflowNodeDetails, process); - addToProcessingQueue(workflowContext); - publishToProcessQueue(process); } - if (processingQueue.isEmpty()) { + + if (processingQueue.isEmpty() && waitingList.isEmpty()) { try { saveWorkflowOutputs(); } catch (AppCatalogException e) { @@ -159,64 +151,16 @@ class SimpleWorkflowInterpreter{ } private void saveWorkflowOutputs() throws AppCatalogException { - List<OutputDataObjectType> outputDataObjects = new ArrayList<OutputDataObjectType>(); + List<OutputDataObjectType> outputDataObjects = new ArrayList<>(); for (OutputNode completeWorkflowOutput : completeWorkflowOutputs) { outputDataObjects.add(completeWorkflowOutput.getOutputObject()); } - RegistryFactory.getAppCatalog().getWorkflowCatalog() - .updateWorkflowOutputs(experiment.getApplicationId(), outputDataObjects); - } - - - private void publishToProcessQueue(TaskDetails process) throws AiravataException { - ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent(); - processSubmitEvent.setCredentialToken(credentialToken); - processSubmitEvent.setTaskId(process.getTaskID()); - MessageContext messageContext = new MessageContext(processSubmitEvent, MessageType.TASK, process.getTaskID(), null); - messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); - publisher.publish(messageContext); - } - - private TaskDetails getProcess(WorkflowNodeDetails wfNodeDetails) throws RegistryException { - // create workflow taskDetails from workflowNodeDetails - TaskDetails taskDetails = ExperimentModelUtil.cloneTaskFromWorkflowNodeDetails(getExperiment(), wfNodeDetails); - taskDetails.setTaskID(getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK_DETAIL, taskDetails, wfNodeDetails.getNodeInstanceId()).toString()); - return taskDetails; - } - - private WorkflowNodeDetails createWorkflowNodeDetails(WorkflowNode readyNode) throws RegistryException { - WorkflowNodeDetails wfNodeDetails = ExperimentModelUtil.createWorkflowNode(readyNode.getId(), null); - ExecutionUnit executionUnit = ExecutionUnit.APPLICATION; - String executionData = null; - if (readyNode instanceof ApplicationNode) { - executionUnit = ExecutionUnit.APPLICATION; - executionData = ((ApplicationNode) readyNode).getApplicationId(); - setupNodeDetailsInput(((ApplicationNode) readyNode), wfNodeDetails); - } else if (readyNode instanceof InputNode) { - executionUnit = ExecutionUnit.INPUT; - } else if (readyNode instanceof OutputNode) { - executionUnit = ExecutionUnit.OUTPUT; - } - wfNodeDetails.setExecutionUnit(executionUnit); - wfNodeDetails.setExecutionUnitData(executionData); - wfNodeDetails.setNodeInstanceId((String) getRegistry().getExperimentCatalog().add(ExpCatChildDataType.WORKFLOW_NODE_DETAIL, wfNodeDetails, getExperiment().getExperimentID())); - return wfNodeDetails; +// RegistryFactory.getAppCatalog().getWorkflowCatalog() +// .updateWorkflowOutputs(experiment.getApplicationId(), outputDataObjects); } - private void setupNodeDetailsInput(ApplicationNode readyAppNode, WorkflowNodeDetails wfNodeDetails) { - if (readyAppNode.isReady()) { - for (InPort inPort : readyAppNode.getInputPorts()) { - wfNodeDetails.addToNodeInputs(inPort.getInputObject()); - } - } else { - throw new IllegalArgumentException("Application node should be in ready state to set inputs to the " + - "workflow node details, nodeId = " + readyAppNode.getId()); - } - } - - private void processWorkflowInputNodes(List<InputNode> inputNodes) { - Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>(); + Set<WorkflowNode> tempNodeSet = new HashSet<>(); for (InputNode inputNode : inputNodes) { if (inputNode.isReady()) { log.debug("Workflow node : " + inputNode.getId() + " is ready to execute"); @@ -251,16 +195,6 @@ class SimpleWorkflowInterpreter{ return registry; } - public Experiment getExperiment() { - return experiment; - } - - private void updateWorkflowNodeStatus(WorkflowNodeDetails wfNodeDetails, WorkflowNodeState state) throws RegistryException{ - WorkflowNodeStatus status = ExperimentModelUtil.createWorkflowNodeStatus(state); - wfNodeDetails.setWorkflowNodeStatus(status); - getRegistry().getExperimentCatalog().update(ExperimentCatalogModelType.WORKFLOW_NODE_STATUS, status, wfNodeDetails.getNodeInstanceId()); - } - /** * Package-Private method. * Remove the workflow node from waiting queue and add it to the ready queue. @@ -278,16 +212,16 @@ class SimpleWorkflowInterpreter{ /** * First remove the node from ready list and then add the WfNodeContainer to the process queue. * Note that underline data structure of the process queue is a Map. - * @param workflowContext - has both workflow and correspond workflowNodeDetails and TaskDetails + * @param applicationNode - has both workflow and correspond workflowNodeDetails and TaskDetails */ - private synchronized void addToProcessingQueue(WorkflowContext workflowContext) { - readyList.remove(workflowContext.getWorkflowNode().getId()); - processingQueue.put(workflowContext.getTaskDetails().getTaskID(), workflowContext); + private synchronized void addToProcessingQueue(ApplicationNode applicationNode) { + readyList.remove(applicationNode.getId()); + processingQueue.put(applicationNode.getId(), applicationNode); } - private synchronized void addToCompleteQueue(WorkflowContext workflowContext) { - processingQueue.remove(workflowContext.getTaskDetails().getTaskID()); - completeList.put(workflowContext.getTaskDetails().getTaskID(), workflowContext); + private synchronized void addToCompleteQueue(ApplicationNode applicationNode) { + processingQueue.remove(applicationNode.getId()); + completeList.put(applicationNode.getId(), applicationNode); } @@ -301,19 +235,18 @@ class SimpleWorkflowInterpreter{ } private void setExperiment(String experimentId) throws RegistryException { - experiment = (Experiment) getRegistry().getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId); + experiment = (ExperimentModel) getRegistry().getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId); log.debug("Retrieve Experiment for experiment id : " + experimentId); } - synchronized void handleTaskOutputChangeEvent(TaskOutputChangeEvent taskOutputChangeEvent) { +/* synchronized void handleTaskOutputChangeEvent(ProcessStatusChangeEvent taskOutputChangeEvent) { String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId(); log.debug("Task Output changed event received for workflow node : " + taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId); - WorkflowContext workflowContext = processingQueue.get(taskId); - Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>(); - if (workflowContext != null) { - WorkflowNode workflowNode = workflowContext.getWorkflowNode(); + WorkflowNode workflowNode = processingQueue.get(taskId); + Set<WorkflowNode> tempWfNodeSet = new HashSet<>(); + if (workflowNode != null) { if (workflowNode instanceof ApplicationNode) { ApplicationNode applicationNode = (ApplicationNode) workflowNode; // Workflow node can have one to many output ports and each output port can have one to many links @@ -331,9 +264,9 @@ class SimpleWorkflowInterpreter{ } } } + addToCompleteQueue(applicationNode); + log.debug("removed task from processing queue : " + taskId); } - addToCompleteQueue(workflowContext); - log.debug("removed task from processing queue : " + taskId); try { processReadyList(); } catch (Exception e) { @@ -341,69 +274,55 @@ class SimpleWorkflowInterpreter{ continueWorkflow = false; } } - } + }*/ void handleProcessStatusChangeEvent(ProcessStatusChangeEvent processStatusChangeEvent) { ProcessState processState = processStatusChangeEvent.getState(); ProcessIdentifier processIdentity = processStatusChangeEvent.getProcessIdentity(); String processId = processIdentity.getProcessId(); - WorkflowContext workflowContext = processingQueue.get(processId); - if (workflowContext != null) { - WorkflowNodeState wfNodeState = WorkflowNodeState.INVOKED; + ApplicationNode applicationNode = (ApplicationNode) processingQueue.get(processId); + if (applicationNode != null) { + ComponentState state = applicationNode.getState(); switch (processState) { case CREATED: case VALIDATED: case STARTED: break; case CONFIGURING_WORKSPACE: - wfNodeState = WorkflowNodeState.COMPLETED; - break; case PRE_PROCESSING: - wfNodeState = WorkflowNodeState.INVOKED; - workflowContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING); - break; case INPUT_DATA_STAGING: - wfNodeState = WorkflowNodeState.INVOKED; - workflowContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING); - break; case EXECUTING: - wfNodeState = WorkflowNodeState.EXECUTING; - workflowContext.getWorkflowNode().setState(NodeState.EXECUTING); - break; case OUTPUT_DATA_STAGING: - wfNodeState = WorkflowNodeState.COMPLETED; - workflowContext.getWorkflowNode().setState(NodeState.POST_PROCESSING); - break; case POST_PROCESSING: - wfNodeState = WorkflowNodeState.COMPLETED; - workflowContext.getWorkflowNode().setState(NodeState.POST_PROCESSING); + state = ComponentState.RUNNING; break; case COMPLETED: - wfNodeState = WorkflowNodeState.COMPLETED; - workflowContext.getWorkflowNode().setState(NodeState.EXECUTED); + state = ComponentState.COMPLETED; break; case FAILED: - wfNodeState = WorkflowNodeState.FAILED; - workflowContext.getWorkflowNode().setState(NodeState.FAILED); + state = ComponentState.FAILED; break; case CANCELED: case CANCELLING: - wfNodeState = WorkflowNodeState.CANCELED; - workflowContext.getWorkflowNode().setState(NodeState.FAILED); + state = ComponentState.CANCELED; break; default: break; } - if (wfNodeState != WorkflowNodeState.UNKNOWN) { + if (state != applicationNode.getState()) { try { - updateWorkflowNodeStatus(workflowContext.getWfNodeDetails(), wfNodeState); + updateWorkflowNodeStatus(applicationNode, new ComponentStatus(state)); } catch (RegistryException e) { - log.error("Error while updating workflow node status update to the registry. nodeInstanceId :" - + workflowContext.getWfNodeDetails().getNodeInstanceId() + " status to: " - + workflowContext.getWfNodeDetails().getWorkflowNodeStatus().toString() , e); + log.error("Error! Couldn't update new application state to registry. nodeInstanceId : {} " + + applicationNode.getId() + " status to: " + applicationNode.getState().toString() , e); } } } } + + private void updateWorkflowNodeStatus(ApplicationNode applicationNode, ComponentStatus componentStatus) throws RegistryException { + // FIXME: save new workflow node status to registry. + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowContext.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowContext.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowContext.java deleted file mode 100644 index 47bd9ca..0000000 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowContext.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.workflow.core; - -import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode; - -public class WorkflowContext { - private WorkflowNode workflowNode; - private WorkflowNodeDetails wfNodeDetails; - private TaskDetails taskDetails; - - public WorkflowContext(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails, TaskDetails taskDetails) { - this.workflowNode = workflowNode; - this.wfNodeDetails = wfNodeDetails; - this.taskDetails = taskDetails; - } - - public WorkflowNode getWorkflowNode() { - return workflowNode; - } - - public void setWorkflowNode(WorkflowNode workflowNode) { - this.workflowNode = workflowNode; - } - - public WorkflowNodeDetails getWfNodeDetails() { - return wfNodeDetails; - } - - public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) { - this.wfNodeDetails = wfNodeDetails; - } - - public TaskDetails getTaskDetails() { - return taskDetails; - } - - public void setTaskDetails(TaskDetails taskDetails) { - this.taskDetails = taskDetails; - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java index aaa3073..34ef8a7 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java @@ -142,7 +142,7 @@ public class WorkflowEnactmentService { TaskIdentifier taskIdentifier = event.getTaskIdentity(); simpleWorkflowInterpreter = getInterpreter(taskIdentifier.getExperimentId()); if (simpleWorkflowInterpreter != null) { - simpleWorkflowInterpreter.handleTaskOutputChangeEvent(event); +// simpleWorkflowInterpreter.handleTaskOutputChangeEvent(event); if (simpleWorkflowInterpreter.isAllDone()) { workflowMap.remove(taskIdentifier.getExperimentId()); } http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java index e06fab5..9392461 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java @@ -23,7 +23,6 @@ package org.apache.airavata.workflow.core; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.workflow.core.parser.AiravataWorkflowBuilder; import org.apache.airavata.workflow.core.parser.JsonWorkflowParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c82cc8d..e41e3f1 100644 --- a/pom.xml +++ b/pom.xml @@ -557,6 +557,7 @@ <module>modules/credential-store</module> <module>modules/orchestrator</module> <module>modules/server</module> + <module>modules/workflow</module> <module>modules/test-suite</module> <!-- Deprecated Modules--> <!--<module>modules/integration-tests</module>-->
