Added temporary pulisher to publish task status change events and outputchange events, Refactored workflow interpreter code and improved it to lauch and iterate the workflow
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/27f6f1b1 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/27f6f1b1 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/27f6f1b1 Branch: refs/heads/master Commit: 27f6f1b12c83448b702064df5f90fb4103ec36c4 Parents: 20d6817 Author: shamrath <[email protected]> Authored: Fri Feb 20 20:29:18 2015 -0500 Committer: shamrath <[email protected]> Committed: Fri Feb 20 20:29:18 2015 -0500 ---------------------------------------------------------------------- .../simple/workflow/engine/ProcessPack.java | 62 ++++++ .../engine/SimpleWorkflowInterpreter.java | 204 ++++++++++++++----- .../simple/workflow/engine/WfNodeContainer.java | 51 ----- .../simple/workflow/engine/WorkflowUtil.java | 10 + .../engine/dag/nodes/WorkflowInputNodeImpl.java | 3 +- 5 files changed, 228 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/27f6f1b1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/ProcessPack.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/ProcessPack.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/ProcessPack.java new file mode 100644 index 0000000..ab8b724 --- /dev/null +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/ProcessPack.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.ariavata.simple.workflow.engine; + +import org.apache.airavata.model.workspace.experiment.TaskDetails; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; +import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowNode; + +public class ProcessPack { + private WorkflowNode workflowNode; + private WorkflowNodeDetails wfNodeDetails; + private TaskDetails taskDetails; + + public ProcessPack(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails, TaskDetails taskDetails) { + this.workflowNode = workflowNode; + this.wfNodeDetails = wfNodeDetails; + this.taskDetails = taskDetails; + } + + public WorkflowNode getWorkflowNode() { + return workflowNode; + } + + public void setWorkflowNode(WorkflowNode workflowNode) { + this.workflowNode = workflowNode; + } + + public WorkflowNodeDetails getWfNodeDetails() { + return wfNodeDetails; + } + + public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) { + this.wfNodeDetails = wfNodeDetails; + } + + public TaskDetails getTaskDetails() { + return taskDetails; + } + + public void setTaskDetails(TaskDetails taskDetails) { + this.taskDetails = taskDetails; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/27f6f1b1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java index b4ec3cb..e122fa6 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java @@ -21,13 +21,18 @@ package org.apache.ariavata.simple.workflow.engine; +import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.model.messaging.event.TaskIdentifier; import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent; import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; import org.apache.airavata.model.util.ExperimentModelUtil; import org.apache.airavata.model.workspace.experiment.ExecutionUnit; import org.apache.airavata.model.workspace.experiment.Experiment; import org.apache.airavata.model.workspace.experiment.TaskDetails; +import org.apache.airavata.model.workspace.experiment.TaskState; import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; import org.apache.airavata.model.workspace.experiment.WorkflowNodeState; import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus; @@ -36,6 +41,7 @@ import org.apache.airavata.registry.cpi.ChildDataType; import org.apache.airavata.registry.cpi.Registry; import org.apache.airavata.registry.cpi.RegistryException; import org.apache.airavata.registry.cpi.RegistryModelType; +import org.apache.ariavata.simple.workflow.engine.dag.edge.Edge; import org.apache.ariavata.simple.workflow.engine.dag.nodes.ApplicationNode; import org.apache.ariavata.simple.workflow.engine.dag.nodes.NodeState; import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowInputNode; @@ -43,6 +49,8 @@ import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowNode; import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode; import org.apache.ariavata.simple.workflow.engine.dag.port.InPort; import org.apache.ariavata.simple.workflow.engine.dag.port.OutPort; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; @@ -53,6 +61,7 @@ import java.util.Set; public class SimpleWorkflowInterpreter implements Runnable{ + private static final Logger log = LoggerFactory.getLogger(SimpleWorkflowInterpreter.class); private List<WorkflowInputNode> workflowInputNodes; @@ -60,11 +69,12 @@ public class SimpleWorkflowInterpreter implements Runnable{ private String credentialToken; - private List<WorkflowNode> readList = new ArrayList<WorkflowNode>(); - private List<WorkflowNode> waitingList = new ArrayList<WorkflowNode>(); - private Map<String,WfNodeContainer> processingQueue = new HashMap<String, WfNodeContainer>(); - private List<WorkflowNode> completeList = new ArrayList<WorkflowNode>(); + private Map<String, WorkflowNode> readList = new HashMap<String, WorkflowNode>(); + private Map<String, WorkflowNode> waitingList = new HashMap<String, WorkflowNode>(); + private Map<String, ProcessPack> processingQueue = new HashMap<String, ProcessPack>(); + private Map<String, ProcessPack> completeList = new HashMap<String, ProcessPack>(); private Registry registry; + private EventBus eventBus = new EventBus(); public SimpleWorkflowInterpreter(Experiment experiment, String credentialStoreToken) { // read the workflow file and build the topology to a DAG. Then execute that dag @@ -77,6 +87,9 @@ public class SimpleWorkflowInterpreter implements Runnable{ public void launchWorkflow() throws Exception { // process workflow input nodes + WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance(); + WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken); + setWorkflowInputNodes(workflowParser.parse()); processWorkflowInputNodes(getWorkflowInputNodes()); processReadyList(); // process workflow application nodes @@ -85,11 +98,11 @@ public class SimpleWorkflowInterpreter implements Runnable{ // try to remove synchronization tag private synchronized void processReadyList() { - for (WorkflowNode readyNode : readList) { + for (WorkflowNode readyNode : readList.values()) { try { WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode); TaskDetails process = getProcess(workflowNodeDetails); - processingQueue.put(process.getTaskID(), new WfNodeContainer(readyNode, workflowNodeDetails)); + addToProcessingQueue(new ProcessPack(readyNode, workflowNodeDetails, process)); publishToProcessQueue(process); } catch (RegistryException e) { // FIXME : handle this exception @@ -98,6 +111,8 @@ public class SimpleWorkflowInterpreter implements Runnable{ } private void publishToProcessQueue(TaskDetails process) { + Thread thread = new Thread(new TempPublisher(process, eventBus)); + thread.start(); //TODO: publish to process queue. } @@ -150,31 +165,21 @@ public class SimpleWorkflowInterpreter implements Runnable{ Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>(); for (WorkflowInputNode wfInputNode : wfInputNodes) { if (wfInputNode.isReady()) { - -// for (Edge edge : wfInputNode.getOutputLinks()) { -// WorkflowUtil.copyValues(wfInputNode.getInputObject(), edge.getToPort().getInputObject()); -// tempNodeSet.add(edge.getToPort().getNode()); -// } - } - } - for (WorkflowNode workflowNode : tempNodeSet) { - if (workflowNode.isReady()) { - readList.add(workflowNode); - } else { - waitingList.add(workflowNode); + for (Edge edge : wfInputNode.getOutPort().getOutEdges()) { + edge.getToPort().setInputObject( + WorkflowUtil.copyValues(wfInputNode.getInputObject(), edge.getToPort().getInputObject())); + if (edge.getToPort().getNode().isReady()) { + addToReadyQueue(edge.getToPort().getNode()); + } else { + addToWaitingQueue(edge.getToPort().getNode()); + } + } } } } public List<WorkflowInputNode> getWorkflowInputNodes() throws Exception { - if (workflowInputNodes == null) { - // read workflow description from registry and parse it - WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance(); - List<WorkflowInputNode> wfInputNodes = wfFactory.getWorkflowParser(experiment.getExperimentID(), - credentialToken).parse(); - setWorkflowInputNodes(wfInputNodes); - } return workflowInputNodes; } @@ -208,29 +213,29 @@ public class SimpleWorkflowInterpreter implements Runnable{ @Subscribe public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent){ String taskId = taskOutputEvent.getTaskIdentity().getTaskId(); - WfNodeContainer wfNodeContainer = processingQueue.get(taskId); + ProcessPack processPack = processingQueue.get(taskId); Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>(); - if (wfNodeContainer != null) { - WorkflowNode workflowNode = wfNodeContainer.getWorkflowNode(); + if (processPack != null) { + WorkflowNode workflowNode = processPack.getWorkflowNode(); if (workflowNode instanceof ApplicationNode) { ApplicationNode applicationNode = (ApplicationNode) workflowNode; // Workflow node can have one to many output ports and each output port can have one to many links for (OutPort outPort : applicationNode.getOutputPorts()) { -// for (Edge edge : outPort.getOutputLinks()) { -// WorkflowUtil.copyValues(outPort.getOutputObject(), edge.getToPort().getInputObject()); -// tempWfNodeSet.add(edge.getToPort().getNode()); -// } - } - - for (WorkflowNode node : tempWfNodeSet) { - if (node.isReady()) { - waitingList.remove(node); - readList.add(node); + for (OutputDataObjectType outputDataObjectType : taskOutputEvent.getOutput()) { + if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) { + outPort.getOutputObject().setValue(outputDataObjectType.getValue()); + break; + } + } + for (Edge edge : outPort.getOutEdges()) { + WorkflowUtil.copyValues(outPort.getOutputObject(), edge.getToPort().getInputObject()); + if (edge.getToPort().getNode().isReady()) { + addToReadyQueue(edge.getToPort().getNode()); + } } } } processingQueue.remove(taskId); - processReadyList(); } } @@ -238,8 +243,8 @@ public class SimpleWorkflowInterpreter implements Runnable{ @Subscribe public void taskStatusChanged(TaskStatusChangeEvent taskStatus){ String taskId = taskStatus.getTaskIdentity().getTaskId(); - WfNodeContainer wfNodeContainer = processingQueue.get(taskId); - if (wfNodeContainer != null) { + ProcessPack processPack = processingQueue.get(taskId); + if (processPack != null) { WorkflowNodeState wfNodeState = WorkflowNodeState.UNKNOWN; switch (taskStatus.getState()) { case WAITING: @@ -247,25 +252,25 @@ public class SimpleWorkflowInterpreter implements Runnable{ case STARTED: break; case PRE_PROCESSING: - wfNodeContainer.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING); + processPack.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING); break; case INPUT_DATA_STAGING: - wfNodeContainer.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING); + processPack.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING); break; case OUTPUT_DATA_STAGING: - wfNodeContainer.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING); + processPack.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING); break; case EXECUTING: - wfNodeContainer.getWorkflowNode().setNodeState(NodeState.EXECUTING); + processPack.getWorkflowNode().setNodeState(NodeState.EXECUTING); break; case POST_PROCESSING: - wfNodeContainer.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING); + processPack.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING); break; case COMPLETED: - wfNodeContainer.getWorkflowNode().setNodeState(NodeState.EXECUTED); + processPack.getWorkflowNode().setNodeState(NodeState.EXECUTED); break; case FAILED: - wfNodeContainer.getWorkflowNode().setNodeState(NodeState.FAILED); + processPack.getWorkflowNode().setNodeState(NodeState.FAILED); break; case UNKNOWN: break; @@ -273,14 +278,14 @@ public class SimpleWorkflowInterpreter implements Runnable{ break; case CANCELED: case CANCELING: - wfNodeContainer.getWorkflowNode().setNodeState(NodeState.FAILED); + processPack.getWorkflowNode().setNodeState(NodeState.FAILED); break; default: break; } if (wfNodeState != WorkflowNodeState.UNKNOWN) { try { - updateWorkflowNodeStatus(wfNodeContainer.getWfNodeDetails(), wfNodeState); + updateWorkflowNodeStatus(processPack.getWfNodeDetails(), wfNodeState); } catch (RegistryException e) { // TODO: handle this. } @@ -289,8 +294,107 @@ public class SimpleWorkflowInterpreter implements Runnable{ } + /** + * Remove the workflow node from waiting queue and add it to the ready queue. + * @param workflowNode - Workflow Node + */ + private synchronized void addToReadyQueue(WorkflowNode workflowNode) { + waitingList.remove(workflowNode.getNodeId()); + readList.put(workflowNode.getNodeId(), workflowNode); + } + + private void addToWaitingQueue(WorkflowNode workflowNode) { + waitingList.put(workflowNode.getNodeId(), workflowNode); + } + + /** + * First remove the node from ready list and then add the WfNodeContainer to the process queue. + * Note that underline data structure of the process queue is a Map. + * @param processPack - has both workflow and correspond workflowNodeDetails and TaskDetails + */ + private synchronized void addToProcessingQueue(ProcessPack processPack) { + readList.remove(processPack.getWorkflowNode().getNodeId()); + processingQueue.put(processPack.getTaskDetails().getTaskID(), processPack); + } + + private synchronized void addToCompleteQueue(ProcessPack processPack) { + processingQueue.remove(processPack.getTaskDetails().getTaskID()); + completeList.put(processPack.getTaskDetails().getTaskID(), processPack); + } + + @Override public void run() { // TODO: Auto generated method body. + try { + launchWorkflow(); + while (!(waitingList.isEmpty() && readList.isEmpty())) { + processReadyList(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + + class TempPublisher implements Runnable { + private TaskDetails tempTaskDetails; + private EventBus tempEventBus; + + public TempPublisher(TaskDetails tempTaskDetails, EventBus tempEventBus) { + this.tempTaskDetails = tempTaskDetails; + this.tempEventBus = tempEventBus; + } + + @Override + public void run() { + try { + TaskIdentifier identifier = new TaskIdentifier(tempTaskDetails.getTaskID(), null, null, null); + TaskStatusChangeEvent statusChangeEvent = new TaskStatusChangeEvent(TaskState.PRE_PROCESSING, identifier); + tempEventBus.post(statusChangeEvent); + Thread.sleep(1000); + statusChangeEvent = new TaskStatusChangeEvent(TaskState.WAITING, identifier); + tempEventBus.post(statusChangeEvent); + Thread.sleep(1000); + statusChangeEvent = new TaskStatusChangeEvent(TaskState.INPUT_DATA_STAGING, identifier); + tempEventBus.post(statusChangeEvent); + Thread.sleep(1000); + statusChangeEvent = new TaskStatusChangeEvent(TaskState.STARTED, identifier); + tempEventBus.post(statusChangeEvent); + Thread.sleep(1000); + statusChangeEvent = new TaskStatusChangeEvent(TaskState.EXECUTING, identifier); + tempEventBus.post(statusChangeEvent); + Thread.sleep(1000); + statusChangeEvent = new TaskStatusChangeEvent(TaskState.POST_PROCESSING, identifier); + tempEventBus.post(statusChangeEvent); + Thread.sleep(1000); + statusChangeEvent = new TaskStatusChangeEvent(TaskState.OUTPUT_DATA_STAGING, identifier); + tempEventBus.post(statusChangeEvent); + Thread.sleep(1000); + statusChangeEvent = new TaskStatusChangeEvent(TaskState.COMPLETED, identifier); + tempEventBus.post(statusChangeEvent); + Thread.sleep(1000); + + List<InputDataObjectType> applicationInputs = tempTaskDetails.getApplicationInputs(); + List<OutputDataObjectType> applicationOutputs = tempTaskDetails.getApplicationOutputs(); + log.info("************** Task output change event fired for application id :" + tempTaskDetails.getApplicationId()); + if (tempTaskDetails.getApplicationId().equals("Add") || tempTaskDetails.getApplicationId().equals("Add_2")) { + applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) + + Integer.parseInt(applicationInputs.get(1).getValue())) + ""); + } else if (tempTaskDetails.getApplicationId().equals("Subtract")) { + applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) - + Integer.parseInt(applicationInputs.get(1).getValue())) + ""); + } else if (tempTaskDetails.getApplicationId().equals("Multiply")) { + applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) * + Integer.parseInt(applicationInputs.get(1).getValue())) + ""); + } + TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent(applicationOutputs, identifier); + eventBus.post(taskOutputChangeEvent); + + } catch (InterruptedException e) { + log.error("Thread was interrupted while sleeping"); + } + + } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/27f6f1b1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WfNodeContainer.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WfNodeContainer.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WfNodeContainer.java deleted file mode 100644 index e0cebd6..0000000 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WfNodeContainer.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.ariavata.simple.workflow.engine; - -import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; -import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowNode; - -public class WfNodeContainer { - private WorkflowNode workflowNode; - private WorkflowNodeDetails wfNodeDetails; - - public WfNodeContainer(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails) { - this.workflowNode = workflowNode; - this.wfNodeDetails = wfNodeDetails; - } - - public WorkflowNode getWorkflowNode() { - return workflowNode; - } - - public void setWorkflowNode(WorkflowNode workflowNode) { - this.workflowNode = workflowNode; - } - - public WorkflowNodeDetails getWfNodeDetails() { - return wfNodeDetails; - } - - public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) { - this.wfNodeDetails = wfNodeDetails; - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/27f6f1b1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java index 71d0288..d4bbad3 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java @@ -21,12 +21,21 @@ package org.apache.ariavata.simple.workflow.engine; +import com.google.common.eventbus.EventBus; import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.model.messaging.event.TaskIdentifier; +import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; +import org.apache.airavata.model.workspace.experiment.TaskDetails; +import org.apache.airavata.model.workspace.experiment.TaskState; +import org.apache.airavata.persistance.registry.jpa.model.TaskDetail; public class WorkflowUtil { public static InputDataObjectType copyValues(InputDataObjectType fromInputObj, InputDataObjectType toInputObj){ + if (toInputObj == null) { + // TODO : throw an error + } toInputObj.setValue(fromInputObj.getValue()); if (fromInputObj.getApplicationArgument() != null && !fromInputObj.getApplicationArgument().trim().equals("")) { @@ -40,4 +49,5 @@ public class WorkflowUtil { return inputData; } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/27f6f1b1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java index 2f912b3..f419ae2 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java @@ -67,7 +67,8 @@ public class WorkflowInputNodeImpl implements WorkflowInputNode { @Override public boolean isReady() { - return inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals(""); + return (inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals("")) + || !inputDataObjectType.isIsRequired(); } @Override
