Added Apache License headers and remove deprecated test methods from json workflow parser test class
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b4ca1eb5 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b4ca1eb5 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b4ca1eb5 Branch: refs/heads/master Commit: b4ca1eb5a361b3040e4115dae2235cb4816c8ff1 Parents: d8df3d0 Author: Shameera Rathnayaka <[email protected]> Authored: Mon Feb 8 11:10:37 2016 -0500 Committer: Shameera Rathnayaka <[email protected]> Committed: Mon Feb 8 11:15:58 2016 -0500 ---------------------------------------------------------------------- .../core/SimpleWorkflowInterpreter.java | 328 ------------------ .../airavata/workflow/core/WorkflowBuilder.java | 21 ++ .../workflow/core/WorkflowEnactmentService.java | 28 +- .../airavata/workflow/core/WorkflowFactory.java | 1 + .../workflow/core/WorkflowInterpreter.java | 336 +++++++++++++++++++ .../airavata/workflow/core/WorkflowParser.java | 46 --- .../core/parser/JsonWorkflowParser.java | 37 +- .../workflow/core/parser/WorkflowParser.java | 46 +++ .../core/parser/JsonWorkflowParserTest.java | 120 +------ 9 files changed, 461 insertions(+), 502 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ca1eb5/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java deleted file mode 100644 index cdbf2f2..0000000 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java +++ /dev/null @@ -1,328 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.workflow.core; - -import org.apache.airavata.common.exception.AiravataException; -import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher; -import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; -import org.apache.airavata.model.ComponentState; -import org.apache.airavata.model.ComponentStatus; -import org.apache.airavata.model.application.io.OutputDataObjectType; -import org.apache.airavata.model.experiment.ExperimentModel; -import org.apache.airavata.model.messaging.event.ProcessIdentifier; -import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent; -import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent; -import org.apache.airavata.model.status.ProcessState; -import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; -import org.apache.airavata.registry.cpi.*; -import org.apache.airavata.workflow.core.dag.edge.Edge; -import org.apache.airavata.workflow.core.dag.nodes.*; -import org.apache.airavata.workflow.core.dag.port.OutPort; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Package-Private class - */ -class SimpleWorkflowInterpreter{ - - private static final Logger log = LoggerFactory.getLogger(SimpleWorkflowInterpreter.class); - private List<InputNode> inputNodes; - - private ExperimentModel experiment; - - private String credentialToken; - - private String gatewayName; - - private String workflowString; - private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<>(); - private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<>(); - private Map<String, WorkflowNode> processingQueue = new ConcurrentHashMap<>(); - private Map<String, WorkflowNode> completeList = new HashMap<>(); - private Registry registry; - private List<OutputNode> completeWorkflowOutputs = new ArrayList<>(); - private RabbitMQProcessLaunchPublisher publisher; - private RabbitMQStatusConsumer statusConsumer; - private String consumerId; - private boolean continueWorkflow = true; - - public SimpleWorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessLaunchPublisher publisher) throws RegistryException { - this.gatewayName = gatewayName; - setExperiment(experimentId); - this.credentialToken = credentialToken; - this.publisher = publisher; - } - - public SimpleWorkflowInterpreter(ExperimentModel experiment, String credentialStoreToken, String gatewayName, RabbitMQProcessLaunchPublisher publisher) { - this.gatewayName = gatewayName; - this.experiment = experiment; - this.credentialToken = credentialStoreToken; - this.publisher = publisher; - } - - /** - * Package-Private method. - * @throws Exception - */ - void launchWorkflow() throws Exception { -// WorkflowBuilder workflowBuilder = WorkflowFactory.getWorkflowBuilder(experiment.getExperimentId(), credentialToken, null); - workflowString = getWorkflow(); - WorkflowParser workflowParser = WorkflowFactory.getWorkflowParser(workflowString); - log.debug("Initialized workflow parser"); - workflowParser.parse(); - setInputNodes(workflowParser.getInputNodes()); - log.debug("Parsed the workflow and got the workflow input nodes"); - // process workflow input nodes - processWorkflowInputNodes(getInputNodes()); - if (readyList.isEmpty()) { - StringBuilder sb = new StringBuilder(); - for (InputNode inputNode : inputNodes) { - sb.append(", "); - sb.append(inputNode.getInputObject().getName()); - sb.append("="); - sb.append(inputNode.getInputObject().getValue()); - } - throw new AiravataException("No workflow application node is in ready state to run with experiment inputs" + sb.toString()); - } - processReadyList(); - } - - private String getWorkflow() throws AppCatalogException, WorkflowCatalogException { - WorkflowCatalog workflowCatalog = RegistryFactory.getAppCatalog().getWorkflowCatalog(); - //FIXME: parse workflowTemplateId or experimentId -// workflowCatalog.getWorkflow(""); - return ""; - } - - // try to remove synchronization tag - /** - * Package-Private method. - * @throws RegistryException - * @throws AiravataException - */ - void processReadyList() throws RegistryException, AiravataException { - if (readyList.isEmpty() && processingQueue.isEmpty() && !waitingList.isEmpty()) { - throw new AiravataException("No workflow application node is in ready state to run"); - } - for (WorkflowNode readyNode : readyList.values()) { - if (readyNode instanceof OutputNode) { - OutputNode outputNode = (OutputNode) readyNode; - outputNode.getOutputObject().setValue(outputNode.getInPort().getInputObject().getValue()); - addToCompleteOutputNodeList(outputNode); - } else if (readyNode instanceof InputNode) { - // set input object of applications and add applications to ready List. - } else if (readyNode instanceof ApplicationNode) { - // call orchestrator to create process for the application - } else { - throw new RuntimeException("Unsupported workflow node type"); - } - } - - if (processingQueue.isEmpty() && waitingList.isEmpty()) { - try { - saveWorkflowOutputs(); - } catch (AppCatalogException e) { - throw new AiravataException("Error while updating completed workflow outputs to registry", e); - } - } - } - - private void saveWorkflowOutputs() throws AppCatalogException { - List<OutputDataObjectType> outputDataObjects = new ArrayList<>(); - for (OutputNode completeWorkflowOutput : completeWorkflowOutputs) { - outputDataObjects.add(completeWorkflowOutput.getOutputObject()); - } -// RegistryFactory.getAppCatalog().getWorkflowCatalog() -// .updateWorkflowOutputs(experiment.getApplicationId(), outputDataObjects); - } - - private void processWorkflowInputNodes(List<InputNode> inputNodes) { - Set<WorkflowNode> tempNodeSet = new HashSet<>(); - for (InputNode inputNode : inputNodes) { - if (inputNode.isReady()) { - log.debug("Workflow node : " + inputNode.getId() + " is ready to execute"); - for (Edge edge : inputNode.getOutPort().getEdges()) { - edge.getToPort().getInputObject().setValue(inputNode.getInputObject().getValue()); - if (edge.getToPort().getNode().isReady()) { - addToReadyQueue(edge.getToPort().getNode()); - log.debug("Added workflow node : " + edge.getToPort().getNode().getId() + " to the readyQueue"); - } else { - addToWaitingQueue(edge.getToPort().getNode()); - log.debug("Added workflow node " + edge.getToPort().getNode().getId() + " to the waitingQueue"); - - } - } - } - } - } - - - public List<InputNode> getInputNodes() throws Exception { - return inputNodes; - } - - public void setInputNodes(List<InputNode> inputNodes) { - this.inputNodes = inputNodes; - } - - private Registry getRegistry() throws RegistryException { - if (registry==null){ - registry = RegistryFactory.getRegistry(); - } - return registry; - } - - /** - * Package-Private method. - * Remove the workflow node from waiting queue and add it to the ready queue. - * @param workflowNode - Workflow Node - */ - synchronized void addToReadyQueue(WorkflowNode workflowNode) { - waitingList.remove(workflowNode.getId()); - readyList.put(workflowNode.getId(), workflowNode); - } - - private void addToWaitingQueue(WorkflowNode workflowNode) { - waitingList.put(workflowNode.getId(), workflowNode); - } - - /** - * First remove the node from ready list and then add the WfNodeContainer to the process queue. - * Note that underline data structure of the process queue is a Map. - * @param applicationNode - has both workflow and correspond workflowNodeDetails and TaskDetails - */ - private synchronized void addToProcessingQueue(ApplicationNode applicationNode) { - readyList.remove(applicationNode.getId()); - processingQueue.put(applicationNode.getId(), applicationNode); - } - - private synchronized void addToCompleteQueue(ApplicationNode applicationNode) { - processingQueue.remove(applicationNode.getId()); - completeList.put(applicationNode.getId(), applicationNode); - } - - - private void addToCompleteOutputNodeList(OutputNode wfOutputNode) { - completeWorkflowOutputs.add(wfOutputNode); - readyList.remove(wfOutputNode.getId()); - } - - boolean isAllDone() { - return !continueWorkflow || (waitingList.isEmpty() && readyList.isEmpty() && processingQueue.isEmpty()); - } - - private void setExperiment(String experimentId) throws RegistryException { - experiment = (ExperimentModel) getRegistry().getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId); - log.debug("Retrieve Experiment for experiment id : " + experimentId); - } - -/* synchronized void handleTaskOutputChangeEvent(ProcessStatusChangeEvent taskOutputChangeEvent) { - - String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId(); - log.debug("Task Output changed event received for workflow node : " + - taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId); - WorkflowNode workflowNode = processingQueue.get(taskId); - Set<WorkflowNode> tempWfNodeSet = new HashSet<>(); - if (workflowNode != null) { - if (workflowNode instanceof ApplicationNode) { - ApplicationNode applicationNode = (ApplicationNode) workflowNode; - // Workflow node can have one to many output ports and each output port can have one to many links - for (OutPort outPort : applicationNode.getOutputPorts()) { - for (OutputDataObjectType outputDataObjectType : taskOutputChangeEvent.getOutput()) { - if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) { - outPort.getOutputObject().setValue(outputDataObjectType.getValue()); - break; - } - } - for (Edge edge : outPort.getEdges()) { - edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue()); - if (edge.getToPort().getNode().isReady()) { - addToReadyQueue(edge.getToPort().getNode()); - } - } - } - addToCompleteQueue(applicationNode); - log.debug("removed task from processing queue : " + taskId); - } - try { - processReadyList(); - } catch (Exception e) { - log.error("Error while processing ready workflow nodes", e); - continueWorkflow = false; - } - } - }*/ - - void handleProcessStatusChangeEvent(ProcessStatusChangeEvent processStatusChangeEvent) { - ProcessState processState = processStatusChangeEvent.getState(); - ProcessIdentifier processIdentity = processStatusChangeEvent.getProcessIdentity(); - String processId = processIdentity.getProcessId(); - ApplicationNode applicationNode = (ApplicationNode) processingQueue.get(processId); - if (applicationNode != null) { - ComponentState state = applicationNode.getState(); - switch (processState) { - case CREATED: - case VALIDATED: - case STARTED: - break; - case CONFIGURING_WORKSPACE: - case PRE_PROCESSING: - case INPUT_DATA_STAGING: - case EXECUTING: - case OUTPUT_DATA_STAGING: - case POST_PROCESSING: - state = ComponentState.RUNNING; - break; - case COMPLETED: - state = ComponentState.COMPLETED; - break; - case FAILED: - state = ComponentState.FAILED; - break; - case CANCELED: - case CANCELLING: - state = ComponentState.CANCELED; - break; - default: - break; - } - if (state != applicationNode.getState()) { - try { - updateWorkflowNodeStatus(applicationNode, new ComponentStatus(state)); - } catch (RegistryException e) { - log.error("Error! Couldn't update new application state to registry. nodeInstanceId : {} " - + applicationNode.getId() + " status to: " + applicationNode.getState().toString() , e); - } - } - } - - } - - private void updateWorkflowNodeStatus(ApplicationNode applicationNode, ComponentStatus componentStatus) throws RegistryException { - // FIXME: save new workflow node status to registry. - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ca1eb5/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowBuilder.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowBuilder.java index a794282..fb97161 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowBuilder.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowBuilder.java @@ -1,3 +1,24 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + package org.apache.airavata.workflow.core; import org.apache.airavata.workflow.core.dag.nodes.InputNode; http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ca1eb5/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java index 34ef8a7..8339aea 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java @@ -46,11 +46,11 @@ public class WorkflowEnactmentService { private final RabbitMQStatusConsumer statusConsumer; private String consumerId; private ExecutorService executor; - private Map<String,SimpleWorkflowInterpreter> workflowMap; + private Map<String,WorkflowInterpreter> workflowMap; private WorkflowEnactmentService () throws AiravataException { executor = Executors.newFixedThreadPool(getThreadPoolSize()); - workflowMap = new ConcurrentHashMap<String, SimpleWorkflowInterpreter>(); + workflowMap = new ConcurrentHashMap<String, WorkflowInterpreter>(); statusConsumer = new RabbitMQStatusConsumer(); consumerId = statusConsumer.listen(new TaskMessageHandler()); // register the shutdown hook to un-bind status consumer. @@ -73,10 +73,10 @@ public class WorkflowEnactmentService { String gatewayName, RabbitMQProcessLaunchPublisher publisher) throws Exception { - SimpleWorkflowInterpreter simpleWorkflowInterpreter = new SimpleWorkflowInterpreter( + WorkflowInterpreter workflowInterpreter = new WorkflowInterpreter( experimentId, credentialToken,gatewayName, publisher); - workflowMap.put(experimentId, simpleWorkflowInterpreter); - simpleWorkflowInterpreter.launchWorkflow(); + workflowMap.put(experimentId, workflowInterpreter); + workflowInterpreter.launchWorkflow(); } @@ -125,13 +125,13 @@ public class WorkflowEnactmentService { private void process() { String message; - SimpleWorkflowInterpreter simpleWorkflowInterpreter; + WorkflowInterpreter workflowInterpreter; if (msgCtx.getType() == MessageType.PROCESS) { ProcessStatusChangeEvent event = ((ProcessStatusChangeEvent) msgCtx.getEvent()); ProcessIdentifier processIdentity = event.getProcessIdentity(); - simpleWorkflowInterpreter = getInterpreter(processIdentity.getExperimentId()); - if (simpleWorkflowInterpreter != null) { - simpleWorkflowInterpreter.handleProcessStatusChangeEvent(event); + workflowInterpreter = getInterpreter(processIdentity.getExperimentId()); + if (workflowInterpreter != null) { + workflowInterpreter.handleProcessStatusChangeEvent(event); } else { // this happens when Task status messages comes after the Taskoutput messages,as we have worked on // output changes it is ok to ignore this. @@ -140,10 +140,10 @@ public class WorkflowEnactmentService { }else if (msgCtx.getType() == MessageType.PROCESSOUTPUT) { TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent(); TaskIdentifier taskIdentifier = event.getTaskIdentity(); - simpleWorkflowInterpreter = getInterpreter(taskIdentifier.getExperimentId()); - if (simpleWorkflowInterpreter != null) { -// simpleWorkflowInterpreter.handleTaskOutputChangeEvent(event); - if (simpleWorkflowInterpreter.isAllDone()) { + workflowInterpreter = getInterpreter(taskIdentifier.getExperimentId()); + if (workflowInterpreter != null) { +// workflowInterpreter.handleTaskOutputChangeEvent(event); + if (workflowInterpreter.isAllDone()) { workflowMap.remove(taskIdentifier.getExperimentId()); } } else { @@ -157,7 +157,7 @@ public class WorkflowEnactmentService { } } - private SimpleWorkflowInterpreter getInterpreter(String experimentId){ + private WorkflowInterpreter getInterpreter(String experimentId){ return workflowMap.get(experimentId); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ca1eb5/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java index 9392461..cb76790 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java @@ -24,6 +24,7 @@ package org.apache.airavata.workflow.core; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.workflow.core.parser.JsonWorkflowParser; +import org.apache.airavata.workflow.core.parser.WorkflowParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ca1eb5/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java new file mode 100644 index 0000000..b42e7ac --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java @@ -0,0 +1,336 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core; + +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher; +import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; +import org.apache.airavata.model.ComponentState; +import org.apache.airavata.model.ComponentStatus; +import org.apache.airavata.model.application.io.OutputDataObjectType; +import org.apache.airavata.model.experiment.ExperimentModel; +import org.apache.airavata.model.messaging.event.ProcessIdentifier; +import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent; +import org.apache.airavata.model.status.ProcessState; +import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; +import org.apache.airavata.registry.cpi.*; +import org.apache.airavata.workflow.core.dag.edge.Edge; +import org.apache.airavata.workflow.core.dag.nodes.*; +import org.apache.airavata.workflow.core.parser.WorkflowParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Package-Private class + */ +class WorkflowInterpreter { + + private static final Logger log = LoggerFactory.getLogger(WorkflowInterpreter.class); + private List<InputNode> inputNodes; + + private ExperimentModel experiment; + + private String credentialToken; + + private String gatewayName; + + private String workflowString; + private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<>(); + private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<>(); + private Map<String, WorkflowNode> processingQueue = new ConcurrentHashMap<>(); + private Map<String, WorkflowNode> completeList = new HashMap<>(); + private Registry registry; + private List<OutputNode> completeWorkflowOutputs = new ArrayList<>(); + private RabbitMQProcessLaunchPublisher publisher; + private RabbitMQStatusConsumer statusConsumer; + private String consumerId; + private boolean continueWorkflow = true; + + public WorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessLaunchPublisher publisher) throws RegistryException { + this.gatewayName = gatewayName; + setExperiment(experimentId); + this.credentialToken = credentialToken; + this.publisher = publisher; + } + + public WorkflowInterpreter(ExperimentModel experiment, String credentialStoreToken, String gatewayName, RabbitMQProcessLaunchPublisher publisher) { + this.gatewayName = gatewayName; + this.experiment = experiment; + this.credentialToken = credentialStoreToken; + this.publisher = publisher; + } + + /** + * Package-Private method. + * + * @throws Exception + */ + void launchWorkflow() throws Exception { +// WorkflowBuilder workflowBuilder = WorkflowFactory.getWorkflowBuilder(experiment.getExperimentId(), credentialToken, null); + workflowString = getWorkflow(); + WorkflowParser workflowParser = WorkflowFactory.getWorkflowParser(workflowString); + log.debug("Initialized workflow parser"); + workflowParser.parse(); + setInputNodes(workflowParser.getInputNodes()); + log.debug("Parsed the workflow and got the workflow input nodes"); + // process workflow input nodes + processWorkflowInputNodes(getInputNodes()); + if (readyList.isEmpty()) { + StringBuilder sb = new StringBuilder(); + for (InputNode inputNode : inputNodes) { + sb.append(", "); + sb.append(inputNode.getInputObject().getName()); + sb.append("="); + sb.append(inputNode.getInputObject().getValue()); + } + throw new AiravataException("No workflow application node is in ready state to run with experiment inputs" + sb.toString()); + } + processReadyList(); + } + + private String getWorkflow() throws AppCatalogException, WorkflowCatalogException { + WorkflowCatalog workflowCatalog = RegistryFactory.getAppCatalog().getWorkflowCatalog(); + //FIXME: parse workflowTemplateId or experimentId +// workflowCatalog.getWorkflow(""); + return ""; + } + + // try to remove synchronization tag + + /** + * Package-Private method. + * + * @throws RegistryException + * @throws AiravataException + */ + void processReadyList() throws RegistryException, AiravataException { + if (readyList.isEmpty() && processingQueue.isEmpty() && !waitingList.isEmpty()) { + throw new AiravataException("No workflow application node is in ready state to run"); + } + for (WorkflowNode readyNode : readyList.values()) { + if (readyNode instanceof OutputNode) { + OutputNode outputNode = (OutputNode) readyNode; + outputNode.getOutputObject().setValue(outputNode.getInPort().getInputObject().getValue()); + addToCompleteOutputNodeList(outputNode); + } else if (readyNode instanceof InputNode) { + // FIXME: set input object of applications and add applications to ready List. + } else if (readyNode instanceof ApplicationNode) { + // FIXME: call orchestrator to create process for the application + } else { + throw new RuntimeException("Unsupported workflow node type"); + } + } + + if (processingQueue.isEmpty() && waitingList.isEmpty()) { + try { + saveWorkflowOutputs(); + } catch (AppCatalogException e) { + throw new AiravataException("Error while updating completed workflow outputs to registry", e); + } + } + } + + private void saveWorkflowOutputs() throws AppCatalogException { + List<OutputDataObjectType> outputDataObjects = new ArrayList<>(); + for (OutputNode completeWorkflowOutput : completeWorkflowOutputs) { + outputDataObjects.add(completeWorkflowOutput.getOutputObject()); + } + // FIXME: save workflow output to registry. +// RegistryFactory.getAppCatalog().getWorkflowCatalog() +// .updateWorkflowOutputs(experiment.getApplicationId(), outputDataObjects); + } + + private void processWorkflowInputNodes(List<InputNode> inputNodes) { + Set<WorkflowNode> tempNodeSet = new HashSet<>(); + for (InputNode inputNode : inputNodes) { + if (inputNode.isReady()) { + log.debug("Workflow node : " + inputNode.getId() + " is ready to execute"); + for (Edge edge : inputNode.getOutPort().getEdges()) { + edge.getToPort().getInputObject().setValue(inputNode.getInputObject().getValue()); + if (edge.getToPort().getNode().isReady()) { + addToReadyQueue(edge.getToPort().getNode()); + log.debug("Added workflow node : " + edge.getToPort().getNode().getId() + " to the readyQueue"); + } else { + addToWaitingQueue(edge.getToPort().getNode()); + log.debug("Added workflow node " + edge.getToPort().getNode().getId() + " to the waitingQueue"); + + } + } + } + } + } + + + public List<InputNode> getInputNodes() throws Exception { + return inputNodes; + } + + public void setInputNodes(List<InputNode> inputNodes) { + this.inputNodes = inputNodes; + } + + private Registry getRegistry() throws RegistryException { + if (registry == null) { + registry = RegistryFactory.getRegistry(); + } + return registry; + } + + /** + * Package-Private method. + * Remove the workflow node from waiting queue and add it to the ready queue. + * + * @param workflowNode - Workflow Node + */ + synchronized void addToReadyQueue(WorkflowNode workflowNode) { + waitingList.remove(workflowNode.getId()); + readyList.put(workflowNode.getId(), workflowNode); + } + + private void addToWaitingQueue(WorkflowNode workflowNode) { + waitingList.put(workflowNode.getId(), workflowNode); + } + + /** + * First remove the node from ready list and then add the WfNodeContainer to the process queue. + * Note that underline data structure of the process queue is a Map. + * + * @param applicationNode - has both workflow and correspond workflowNodeDetails and TaskDetails + */ + private synchronized void addToProcessingQueue(ApplicationNode applicationNode) { + readyList.remove(applicationNode.getId()); + processingQueue.put(applicationNode.getId(), applicationNode); + } + + private synchronized void addToCompleteQueue(ApplicationNode applicationNode) { + processingQueue.remove(applicationNode.getId()); + completeList.put(applicationNode.getId(), applicationNode); + } + + + private void addToCompleteOutputNodeList(OutputNode wfOutputNode) { + completeWorkflowOutputs.add(wfOutputNode); + readyList.remove(wfOutputNode.getId()); + } + + boolean isAllDone() { + return !continueWorkflow || (waitingList.isEmpty() && readyList.isEmpty() && processingQueue.isEmpty()); + } + + private void setExperiment(String experimentId) throws RegistryException { + experiment = (ExperimentModel) getRegistry().getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId); + log.debug("Retrieve Experiment for experiment id : " + experimentId); + } + +/* synchronized void handleTaskOutputChangeEvent(ProcessStatusChangeEvent taskOutputChangeEvent) { + + String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId(); + log.debug("Task Output changed event received for workflow node : " + + taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId); + WorkflowNode workflowNode = processingQueue.get(taskId); + Set<WorkflowNode> tempWfNodeSet = new HashSet<>(); + if (workflowNode != null) { + if (workflowNode instanceof ApplicationNode) { + ApplicationNode applicationNode = (ApplicationNode) workflowNode; + // Workflow node can have one to many output ports and each output port can have one to many links + for (OutPort outPort : applicationNode.getOutputPorts()) { + for (OutputDataObjectType outputDataObjectType : taskOutputChangeEvent.getOutput()) { + if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) { + outPort.getOutputObject().setValue(outputDataObjectType.getValue()); + break; + } + } + for (Edge edge : outPort.getEdges()) { + edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue()); + if (edge.getToPort().getNode().isReady()) { + addToReadyQueue(edge.getToPort().getNode()); + } + } + } + addToCompleteQueue(applicationNode); + log.debug("removed task from processing queue : " + taskId); + } + try { + processReadyList(); + } catch (Exception e) { + log.error("Error while processing ready workflow nodes", e); + continueWorkflow = false; + } + } + }*/ + + void handleProcessStatusChangeEvent(ProcessStatusChangeEvent processStatusChangeEvent) { + ProcessState processState = processStatusChangeEvent.getState(); + ProcessIdentifier processIdentity = processStatusChangeEvent.getProcessIdentity(); + String processId = processIdentity.getProcessId(); + ApplicationNode applicationNode = (ApplicationNode) processingQueue.get(processId); + if (applicationNode != null) { + ComponentState state = applicationNode.getState(); + switch (processState) { + case CREATED: + case VALIDATED: + case STARTED: + break; + case CONFIGURING_WORKSPACE: + case PRE_PROCESSING: + case INPUT_DATA_STAGING: + case EXECUTING: + case OUTPUT_DATA_STAGING: + case POST_PROCESSING: + state = ComponentState.RUNNING; + break; + case COMPLETED: + state = ComponentState.COMPLETED; + // FIXME: read output form registry and set it to node outputport then continue to next application. + break; + case FAILED: + state = ComponentState.FAILED; + // FIXME: fail workflow. + break; + case CANCELED: + case CANCELLING: + state = ComponentState.CANCELED; + // FIXME: cancel workflow. + break; + default: + break; + } + if (state != applicationNode.getState()) { + try { + updateWorkflowNodeStatus(applicationNode, new ComponentStatus(state)); + } catch (RegistryException e) { + log.error("Error! Couldn't update new application state to registry. nodeInstanceId : {} " + + applicationNode.getId() + " status to: " + applicationNode.getState().toString(), e); + } + } + } + + } + + private void updateWorkflowNodeStatus(ApplicationNode applicationNode, ComponentStatus componentStatus) throws RegistryException { + // FIXME: save new workflow node status to registry. + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ca1eb5/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java deleted file mode 100644 index 46bc1d8..0000000 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.workflow.core; - -import org.apache.airavata.workflow.core.dag.edge.Edge; -import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode; -import org.apache.airavata.workflow.core.dag.nodes.InputNode; -import org.apache.airavata.workflow.core.dag.nodes.OutputNode; -import org.apache.airavata.workflow.core.dag.port.Port; - -import java.util.List; - -public interface WorkflowParser { - - public void parse() throws Exception; - - public List<InputNode> getInputNodes() throws Exception; - - public List<OutputNode> getOutputNodes() throws Exception; - - public List<ApplicationNode> getApplicationNodes() throws Exception; - - public List<Port> getPorts() throws Exception; - - public List<Edge> getEdges() throws Exception; - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ca1eb5/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java index ede69e3..f6bb084 100644 --- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java @@ -1,19 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + package org.apache.airavata.workflow.core.parser; import com.google.gson.JsonObject; -import org.apache.airavata.workflow.core.WorkflowParser; import org.apache.airavata.workflow.core.dag.edge.Edge; import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode; import org.apache.airavata.workflow.core.dag.nodes.InputNode; import org.apache.airavata.workflow.core.dag.nodes.OutputNode; import org.apache.airavata.workflow.core.dag.port.Port; +import java.util.ArrayList; import java.util.List; -/** - * Created by syodage on 1/27/16. - */ -public class JsonWorkflowParser implements WorkflowParser{ +public class JsonWorkflowParser implements WorkflowParser { private final String workflow; private List<InputNode> inputs; @@ -24,8 +42,15 @@ public class JsonWorkflowParser implements WorkflowParser{ public JsonWorkflowParser(String jsonWorkflowString) { workflow = jsonWorkflowString; + + inputs = new ArrayList<>(); + outputs = new ArrayList<>(); + applications = new ArrayList<>(); + ports = new ArrayList<>(); + edges = new ArrayList<>(); } + @Override public void parse() throws Exception { // TODO parse json string and construct components @@ -69,7 +94,7 @@ public class JsonWorkflowParser implements WorkflowParser{ return null; } - private Port createPort(JsonObject jPort){ + private Port createPort(JsonObject jPort) { return null; } http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ca1eb5/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/WorkflowParser.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/WorkflowParser.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/WorkflowParser.java new file mode 100644 index 0000000..dc18c9e --- /dev/null +++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/WorkflowParser.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.workflow.core.parser; + +import org.apache.airavata.workflow.core.dag.edge.Edge; +import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode; +import org.apache.airavata.workflow.core.dag.nodes.InputNode; +import org.apache.airavata.workflow.core.dag.nodes.OutputNode; +import org.apache.airavata.workflow.core.dag.port.Port; + +import java.util.List; + +public interface WorkflowParser { + + public void parse() throws Exception; + + public List<InputNode> getInputNodes() throws Exception; + + public List<OutputNode> getOutputNodes() throws Exception; + + public List<ApplicationNode> getApplicationNodes() throws Exception; + + public List<Port> getPorts() throws Exception; + + public List<Edge> getEdges() throws Exception; + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ca1eb5/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java ---------------------------------------------------------------------- diff --git a/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java b/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java index 712944d..3fedc9c 100644 --- a/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java +++ b/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java @@ -1,51 +1,25 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - package org.apache.airavata.workflow.core.parser; -import org.apache.airavata.model.application.io.DataType; -import org.apache.airavata.model.application.io.InputDataObjectType; -import org.apache.airavata.model.experiment.ExperimentModel; -import org.apache.airavata.workflow.core.WorkflowParser; -import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode; -import org.apache.airavata.workflow.core.dag.nodes.InputNode; -import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode; -import org.apache.airavata.workflow.core.dag.nodes.OutputNode; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.io.File; +import java.io.InputStream; +import static org.junit.Assert.*; + +/** + * Created by syodage on 2/8/16. + */ public class JsonWorkflowParserTest { + private String workflowString; + + @Before public void setUp() throws Exception { - + InputStream inputStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("TestWorkflow.json"); } @After @@ -54,77 +28,7 @@ public class JsonWorkflowParserTest { } @Test - public void testWorkflowParse() throws Exception { - Assert.assertNotNull("Test file (ComplexMathWorkflow.awf) is missing", getClass().getResource("/ComplexMathWorkflow.awf")); - InputStreamReader isr = new InputStreamReader(this.getClass().getResourceAsStream("/ComplexMathWorkflow.awf")); - BufferedReader br = new BufferedReader(isr); - StringBuffer sb = new StringBuffer(); - String nextLine = br.readLine(); - while (nextLine != null) { - sb.append(nextLine); - nextLine = br.readLine(); - } -// Workflow workflow = new Workflow(sb.toString()); - ExperimentModel experiment = new ExperimentModel(); - InputDataObjectType x = new InputDataObjectType(); - x.setValue("6"); - x.setType(DataType.STRING); - x.setName("x"); - - InputDataObjectType y = new InputDataObjectType(); - y.setValue("8"); - y.setType(DataType.STRING); - y.setName("y"); - - InputDataObjectType z = new InputDataObjectType(); - z.setValue("10"); - z.setType(DataType.STRING); - z.setName("y_2"); - - List<InputDataObjectType> inputs = new ArrayList<InputDataObjectType>(); - inputs.add(x); - inputs.add(y); - inputs.add(z); - experiment.setExperimentInputs(inputs); - // create parser - WorkflowParser parser = new JsonWorkflowParser("workflow string"); - parser.parse(); - List<InputNode> inputNodes = parser.getInputNodes(); - Assert.assertNotNull(inputNodes); - Assert.assertEquals(3, inputNodes.size()); - for (InputNode inputNode : inputNodes) { - Assert.assertNotNull(inputNode.getOutPort()); - Assert.assertNotNull(inputNode.getInputObject()); - } - - Map<String, WorkflowNode> wfNodes = getWorkflowNodeMap(parser.getApplicationNodes()); - for (String wfId : wfNodes.keySet()) { - WorkflowNode wfNode = wfNodes.get(wfId); - if (wfNode instanceof ApplicationNode) { - ApplicationNode node = (ApplicationNode) wfNode; - Assert.assertEquals(2, node.getInputPorts().size()); - Assert.assertNotNull(node.getInputPorts().get(0).getInputObject()); - Assert.assertNotNull(node.getInputPorts().get(1).getInputObject()); - Assert.assertNotNull(node.getInputPorts().get(0).getEdge()); - Assert.assertNotNull(node.getInputPorts().get(1).getEdge()); - - Assert.assertEquals(1, node.getOutputPorts().size()); - Assert.assertEquals(1, node.getOutputPorts().get(0).getEdges().size()); - Assert.assertNotNull(node.getOutputPorts().get(0).getEdges().get(0)); - } else if (wfNode instanceof OutputNode) { - OutputNode outputNode = (OutputNode) wfNode; - Assert.assertNotNull(outputNode.getInPort()); - } - } - - } - - private Map<String, WorkflowNode> getWorkflowNodeMap(List<ApplicationNode> applicationNodes) { - Map<String, WorkflowNode> map = new HashMap<>(); - for (ApplicationNode applicationNode : applicationNodes) { - map.put(applicationNode.getApplicationId(), applicationNode); - } + public void testParse() throws Exception { - return map; } } \ No newline at end of file
