Added temporary pulisher to publish task status change events and outputchange 
events, Refactored workflow interpreter code and improved it to lauch and 
iterate the workflow


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/27f6f1b1
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/27f6f1b1
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/27f6f1b1

Branch: refs/heads/master
Commit: 27f6f1b12c83448b702064df5f90fb4103ec36c4
Parents: 20d6817
Author: shamrath <[email protected]>
Authored: Fri Feb 20 20:29:18 2015 -0500
Committer: shamrath <[email protected]>
Committed: Fri Feb 20 20:29:18 2015 -0500

----------------------------------------------------------------------
 .../simple/workflow/engine/ProcessPack.java     |  62 ++++++
 .../engine/SimpleWorkflowInterpreter.java       | 204 ++++++++++++++-----
 .../simple/workflow/engine/WfNodeContainer.java |  51 -----
 .../simple/workflow/engine/WorkflowUtil.java    |  10 +
 .../engine/dag/nodes/WorkflowInputNodeImpl.java |   3 +-
 5 files changed, 228 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/27f6f1b1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/ProcessPack.java
----------------------------------------------------------------------
diff --git 
a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/ProcessPack.java
 
b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/ProcessPack.java
new file mode 100644
index 0000000..ab8b724
--- /dev/null
+++ 
b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/ProcessPack.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.ariavata.simple.workflow.engine;
+
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+
+public class ProcessPack {
+    private WorkflowNode workflowNode;
+    private WorkflowNodeDetails wfNodeDetails;
+    private TaskDetails taskDetails;
+
+    public ProcessPack(WorkflowNode workflowNode, WorkflowNodeDetails 
wfNodeDetails, TaskDetails taskDetails) {
+        this.workflowNode = workflowNode;
+        this.wfNodeDetails = wfNodeDetails;
+        this.taskDetails = taskDetails;
+    }
+
+    public WorkflowNode getWorkflowNode() {
+        return workflowNode;
+    }
+
+    public void setWorkflowNode(WorkflowNode workflowNode) {
+        this.workflowNode = workflowNode;
+    }
+
+    public WorkflowNodeDetails getWfNodeDetails() {
+        return wfNodeDetails;
+    }
+
+    public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) {
+        this.wfNodeDetails = wfNodeDetails;
+    }
+
+    public TaskDetails getTaskDetails() {
+        return taskDetails;
+    }
+
+    public void setTaskDetails(TaskDetails taskDetails) {
+        this.taskDetails = taskDetails;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/27f6f1b1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
----------------------------------------------------------------------
diff --git 
a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
 
b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
index b4ec3cb..e122fa6 100644
--- 
a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
+++ 
b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
@@ -21,13 +21,18 @@
 
 package org.apache.ariavata.simple.workflow.engine;
 
+import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
 import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
 import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
 import org.apache.airavata.model.util.ExperimentModelUtil;
 import org.apache.airavata.model.workspace.experiment.ExecutionUnit;
 import org.apache.airavata.model.workspace.experiment.Experiment;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
 import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
 import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
 import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
@@ -36,6 +41,7 @@ import org.apache.airavata.registry.cpi.ChildDataType;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.registry.cpi.RegistryException;
 import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.ariavata.simple.workflow.engine.dag.edge.Edge;
 import org.apache.ariavata.simple.workflow.engine.dag.nodes.ApplicationNode;
 import org.apache.ariavata.simple.workflow.engine.dag.nodes.NodeState;
 import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
@@ -43,6 +49,8 @@ import 
org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowNode;
 import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
 import org.apache.ariavata.simple.workflow.engine.dag.port.InPort;
 import org.apache.ariavata.simple.workflow.engine.dag.port.OutPort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -53,6 +61,7 @@ import java.util.Set;
 
 public class SimpleWorkflowInterpreter implements Runnable{
 
+    private static final Logger log = 
LoggerFactory.getLogger(SimpleWorkflowInterpreter.class);
 
     private List<WorkflowInputNode> workflowInputNodes;
 
@@ -60,11 +69,12 @@ public class SimpleWorkflowInterpreter implements Runnable{
 
     private String credentialToken;
 
-    private List<WorkflowNode> readList = new ArrayList<WorkflowNode>();
-    private List<WorkflowNode> waitingList = new ArrayList<WorkflowNode>();
-    private Map<String,WfNodeContainer> processingQueue = new HashMap<String, 
WfNodeContainer>();
-    private List<WorkflowNode> completeList = new ArrayList<WorkflowNode>();
+    private Map<String, WorkflowNode> readList = new HashMap<String, 
WorkflowNode>();
+    private Map<String, WorkflowNode> waitingList = new HashMap<String, 
WorkflowNode>();
+    private Map<String, ProcessPack> processingQueue = new HashMap<String, 
ProcessPack>();
+    private Map<String, ProcessPack> completeList = new HashMap<String, 
ProcessPack>();
     private Registry registry;
+    private EventBus eventBus = new EventBus();
 
     public SimpleWorkflowInterpreter(Experiment experiment, String 
credentialStoreToken) {
         // read the workflow file and build the topology to a DAG. Then 
execute that dag
@@ -77,6 +87,9 @@ public class SimpleWorkflowInterpreter implements Runnable{
 
     public void launchWorkflow() throws Exception {
         // process workflow input nodes
+        WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance();
+        WorkflowParser workflowParser = 
wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken);
+        setWorkflowInputNodes(workflowParser.parse());
         processWorkflowInputNodes(getWorkflowInputNodes());
         processReadyList();
         // process workflow application nodes
@@ -85,11 +98,11 @@ public class SimpleWorkflowInterpreter implements Runnable{
 
     // try to remove synchronization tag
     private synchronized void processReadyList() {
-        for (WorkflowNode readyNode : readList) {
+        for (WorkflowNode readyNode : readList.values()) {
             try {
                 WorkflowNodeDetails workflowNodeDetails = 
createWorkflowNodeDetails(readyNode);
                 TaskDetails process = getProcess(workflowNodeDetails);
-                processingQueue.put(process.getTaskID(), new 
WfNodeContainer(readyNode, workflowNodeDetails));
+                addToProcessingQueue(new ProcessPack(readyNode, 
workflowNodeDetails, process));
                 publishToProcessQueue(process);
             } catch (RegistryException e) {
                 // FIXME : handle this exception
@@ -98,6 +111,8 @@ public class SimpleWorkflowInterpreter implements Runnable{
     }
 
     private void publishToProcessQueue(TaskDetails process) {
+        Thread thread = new Thread(new TempPublisher(process, eventBus));
+        thread.start();
         //TODO: publish to process queue.
     }
 
@@ -150,31 +165,21 @@ public class SimpleWorkflowInterpreter implements 
Runnable{
         Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>();
         for (WorkflowInputNode wfInputNode : wfInputNodes) {
             if (wfInputNode.isReady()) {
-
-//                for (Edge edge : wfInputNode.getOutputLinks()) {
-//                    WorkflowUtil.copyValues(wfInputNode.getInputObject(), 
edge.getToPort().getInputObject());
-//                    tempNodeSet.add(edge.getToPort().getNode());
-//                }
-            }
-        }
-        for (WorkflowNode workflowNode : tempNodeSet) {
-            if (workflowNode.isReady()) {
-                readList.add(workflowNode);
-            } else {
-                waitingList.add(workflowNode);
+                for (Edge edge : wfInputNode.getOutPort().getOutEdges()) {
+                    edge.getToPort().setInputObject(
+                            
WorkflowUtil.copyValues(wfInputNode.getInputObject(), 
edge.getToPort().getInputObject()));
+                    if (edge.getToPort().getNode().isReady()) {
+                        addToReadyQueue(edge.getToPort().getNode());
+                    } else {
+                        addToWaitingQueue(edge.getToPort().getNode());
+                    }
+                }
             }
         }
     }
 
 
     public List<WorkflowInputNode> getWorkflowInputNodes() throws Exception {
-        if (workflowInputNodes == null) {
-            // read workflow description from registry and parse it
-            WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance();
-            List<WorkflowInputNode> wfInputNodes = 
wfFactory.getWorkflowParser(experiment.getExperimentID(),
-                    credentialToken).parse();
-            setWorkflowInputNodes(wfInputNodes);
-        }
         return workflowInputNodes;
     }
 
@@ -208,29 +213,29 @@ public class SimpleWorkflowInterpreter implements 
Runnable{
     @Subscribe
     public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent){
         String taskId = taskOutputEvent.getTaskIdentity().getTaskId();
-        WfNodeContainer wfNodeContainer = processingQueue.get(taskId);
+        ProcessPack processPack = processingQueue.get(taskId);
         Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>();
-        if (wfNodeContainer != null) {
-            WorkflowNode workflowNode = wfNodeContainer.getWorkflowNode();
+        if (processPack != null) {
+            WorkflowNode workflowNode = processPack.getWorkflowNode();
             if (workflowNode instanceof ApplicationNode) {
                 ApplicationNode applicationNode = (ApplicationNode) 
workflowNode;
                 // Workflow node can have one to many output ports and each 
output port can have one to many links
                 for (OutPort outPort : applicationNode.getOutputPorts()) {
-//                    for (Edge edge : outPort.getOutputLinks()) {
-//                        WorkflowUtil.copyValues(outPort.getOutputObject(), 
edge.getToPort().getInputObject());
-//                        tempWfNodeSet.add(edge.getToPort().getNode());
-//                    }
-                }
-
-                for (WorkflowNode node : tempWfNodeSet) {
-                    if (node.isReady()) {
-                        waitingList.remove(node);
-                        readList.add(node);
+                    for (OutputDataObjectType outputDataObjectType : 
taskOutputEvent.getOutput()) {
+                        if 
(outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) {
+                            
outPort.getOutputObject().setValue(outputDataObjectType.getValue());
+                            break;
+                        }
+                    }
+                    for (Edge edge : outPort.getOutEdges()) {
+                        WorkflowUtil.copyValues(outPort.getOutputObject(), 
edge.getToPort().getInputObject());
+                        if (edge.getToPort().getNode().isReady()) {
+                            addToReadyQueue(edge.getToPort().getNode());
+                        }
                     }
                 }
             }
             processingQueue.remove(taskId);
-            processReadyList();
         }
 
     }
@@ -238,8 +243,8 @@ public class SimpleWorkflowInterpreter implements Runnable{
     @Subscribe
     public void taskStatusChanged(TaskStatusChangeEvent taskStatus){
         String taskId = taskStatus.getTaskIdentity().getTaskId();
-        WfNodeContainer wfNodeContainer = processingQueue.get(taskId);
-        if (wfNodeContainer != null) {
+        ProcessPack processPack = processingQueue.get(taskId);
+        if (processPack != null) {
             WorkflowNodeState wfNodeState = WorkflowNodeState.UNKNOWN;
             switch (taskStatus.getState()) {
                 case WAITING:
@@ -247,25 +252,25 @@ public class SimpleWorkflowInterpreter implements 
Runnable{
                 case STARTED:
                     break;
                 case PRE_PROCESSING:
-                    
wfNodeContainer.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING);
+                    
processPack.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING);
                     break;
                 case INPUT_DATA_STAGING:
-                    
wfNodeContainer.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING);
+                    
processPack.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING);
                     break;
                 case OUTPUT_DATA_STAGING:
-                    
wfNodeContainer.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING);
+                    
processPack.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING);
                     break;
                 case EXECUTING:
-                    
wfNodeContainer.getWorkflowNode().setNodeState(NodeState.EXECUTING);
+                    
processPack.getWorkflowNode().setNodeState(NodeState.EXECUTING);
                     break;
                 case POST_PROCESSING:
-                    
wfNodeContainer.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING);
+                    
processPack.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING);
                     break;
                 case COMPLETED:
-                    
wfNodeContainer.getWorkflowNode().setNodeState(NodeState.EXECUTED);
+                    
processPack.getWorkflowNode().setNodeState(NodeState.EXECUTED);
                     break;
                 case FAILED:
-                    
wfNodeContainer.getWorkflowNode().setNodeState(NodeState.FAILED);
+                    
processPack.getWorkflowNode().setNodeState(NodeState.FAILED);
                     break;
                 case UNKNOWN:
                     break;
@@ -273,14 +278,14 @@ public class SimpleWorkflowInterpreter implements 
Runnable{
                     break;
                 case CANCELED:
                 case CANCELING:
-                    
wfNodeContainer.getWorkflowNode().setNodeState(NodeState.FAILED);
+                    
processPack.getWorkflowNode().setNodeState(NodeState.FAILED);
                     break;
                 default:
                     break;
             }
             if (wfNodeState != WorkflowNodeState.UNKNOWN) {
                 try {
-                    
updateWorkflowNodeStatus(wfNodeContainer.getWfNodeDetails(), wfNodeState);
+                    updateWorkflowNodeStatus(processPack.getWfNodeDetails(), 
wfNodeState);
                 } catch (RegistryException e) {
                     // TODO: handle this.
                 }
@@ -289,8 +294,107 @@ public class SimpleWorkflowInterpreter implements 
Runnable{
 
     }
 
+    /**
+     * Remove the workflow node from waiting queue and add it to the ready 
queue.
+     * @param workflowNode - Workflow Node
+     */
+    private synchronized void addToReadyQueue(WorkflowNode workflowNode) {
+        waitingList.remove(workflowNode.getNodeId());
+        readList.put(workflowNode.getNodeId(), workflowNode);
+    }
+
+    private void addToWaitingQueue(WorkflowNode workflowNode) {
+        waitingList.put(workflowNode.getNodeId(), workflowNode);
+    }
+
+    /**
+     * First remove the node from ready list and then add the WfNodeContainer 
to the process queue.
+     * Note that underline data structure of the process queue is a Map.
+     * @param processPack - has both workflow and correspond 
workflowNodeDetails and TaskDetails
+     */
+    private synchronized void addToProcessingQueue(ProcessPack processPack) {
+        readList.remove(processPack.getWorkflowNode().getNodeId());
+        processingQueue.put(processPack.getTaskDetails().getTaskID(), 
processPack);
+    }
+
+    private synchronized void addToCompleteQueue(ProcessPack processPack) {
+        processingQueue.remove(processPack.getTaskDetails().getTaskID());
+        completeList.put(processPack.getTaskDetails().getTaskID(), 
processPack);
+    }
+
+
     @Override
     public void run() {
         // TODO: Auto generated method body.
+        try {
+            launchWorkflow();
+            while (!(waitingList.isEmpty() && readList.isEmpty())) {
+                processReadyList();
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+
+    class TempPublisher implements Runnable {
+        private TaskDetails tempTaskDetails;
+        private EventBus tempEventBus;
+
+        public TempPublisher(TaskDetails tempTaskDetails, EventBus 
tempEventBus) {
+            this.tempTaskDetails = tempTaskDetails;
+            this.tempEventBus = tempEventBus;
+        }
+
+        @Override
+        public void run() {
+            try {
+                TaskIdentifier identifier = new 
TaskIdentifier(tempTaskDetails.getTaskID(), null, null, null);
+                TaskStatusChangeEvent statusChangeEvent = new 
TaskStatusChangeEvent(TaskState.PRE_PROCESSING, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new 
TaskStatusChangeEvent(TaskState.WAITING, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new 
TaskStatusChangeEvent(TaskState.INPUT_DATA_STAGING, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new 
TaskStatusChangeEvent(TaskState.STARTED, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new 
TaskStatusChangeEvent(TaskState.EXECUTING, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new 
TaskStatusChangeEvent(TaskState.POST_PROCESSING, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new 
TaskStatusChangeEvent(TaskState.OUTPUT_DATA_STAGING, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new 
TaskStatusChangeEvent(TaskState.COMPLETED, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+
+                List<InputDataObjectType> applicationInputs = 
tempTaskDetails.getApplicationInputs();
+                List<OutputDataObjectType> applicationOutputs = 
tempTaskDetails.getApplicationOutputs();
+                log.info("**************   Task output change event fired for 
application id :" + tempTaskDetails.getApplicationId());
+                if (tempTaskDetails.getApplicationId().equals("Add") || 
tempTaskDetails.getApplicationId().equals("Add_2")) {
+                    
applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue())
 +
+                            
Integer.parseInt(applicationInputs.get(1).getValue())) + "");
+                } else if 
(tempTaskDetails.getApplicationId().equals("Subtract")) {
+                    
applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue())
 -
+                            
Integer.parseInt(applicationInputs.get(1).getValue())) + "");
+                } else if 
(tempTaskDetails.getApplicationId().equals("Multiply")) {
+                    
applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue())
 *
+                            
Integer.parseInt(applicationInputs.get(1).getValue())) + "");
+                }
+                TaskOutputChangeEvent taskOutputChangeEvent = new 
TaskOutputChangeEvent(applicationOutputs, identifier);
+                eventBus.post(taskOutputChangeEvent);
+
+            } catch (InterruptedException e) {
+                log.error("Thread was interrupted while sleeping");
+            }
+
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/27f6f1b1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WfNodeContainer.java
----------------------------------------------------------------------
diff --git 
a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WfNodeContainer.java
 
b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WfNodeContainer.java
deleted file mode 100644
index e0cebd6..0000000
--- 
a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WfNodeContainer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.ariavata.simple.workflow.engine;
-
-import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
-import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowNode;
-
-public class WfNodeContainer {
-    private WorkflowNode workflowNode;
-    private WorkflowNodeDetails wfNodeDetails;
-
-    public WfNodeContainer(WorkflowNode workflowNode, WorkflowNodeDetails 
wfNodeDetails) {
-        this.workflowNode = workflowNode;
-        this.wfNodeDetails = wfNodeDetails;
-    }
-
-    public WorkflowNode getWorkflowNode() {
-        return workflowNode;
-    }
-
-    public void setWorkflowNode(WorkflowNode workflowNode) {
-        this.workflowNode = workflowNode;
-    }
-
-    public WorkflowNodeDetails getWfNodeDetails() {
-        return wfNodeDetails;
-    }
-
-    public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) {
-        this.wfNodeDetails = wfNodeDetails;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/27f6f1b1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java
----------------------------------------------------------------------
diff --git 
a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java
 
b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java
index 71d0288..d4bbad3 100644
--- 
a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java
+++ 
b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java
@@ -21,12 +21,21 @@
 
 package org.apache.ariavata.simple.workflow.engine;
 
+import com.google.common.eventbus.EventBus;
 import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
 import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.persistance.registry.jpa.model.TaskDetail;
 
 public class WorkflowUtil {
 
     public static InputDataObjectType copyValues(InputDataObjectType 
fromInputObj, InputDataObjectType toInputObj){
+        if (toInputObj == null) {
+            // TODO : throw an error
+        }
         toInputObj.setValue(fromInputObj.getValue());
         if (fromInputObj.getApplicationArgument() != null
                 && !fromInputObj.getApplicationArgument().trim().equals("")) {
@@ -40,4 +49,5 @@ public class WorkflowUtil {
         return inputData;
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/27f6f1b1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
 
b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
index 2f912b3..f419ae2 100644
--- 
a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
+++ 
b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
@@ -67,7 +67,8 @@ public class WorkflowInputNodeImpl implements 
WorkflowInputNode {
 
     @Override
     public boolean isReady() {
-        return inputDataObjectType.getValue() != null && 
!inputDataObjectType.getValue().equals("");
+        return (inputDataObjectType.getValue() != null && 
!inputDataObjectType.getValue().equals(""))
+                || !inputDataObjectType.isIsRequired();
     }
 
     @Override

Reply via email to