Implemented execution logic of the workflow data model
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/55319c96 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/55319c96 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/55319c96 Branch: refs/heads/master Commit: 55319c96d2c7ace7ab7b458dbd4fd259649d2e8e Parents: 27f6f1b Author: shamrath <[email protected]> Authored: Sun Feb 22 16:55:55 2015 -0500 Committer: shamrath <[email protected]> Committed: Sun Feb 22 16:55:55 2015 -0500 ---------------------------------------------------------------------- .../engine/SimpleWorkflowInterpreter.java | 92 +++++++++++++++++--- .../workflow/engine/WorkflowFactoryImpl.java | 4 +- .../simple/workflow/engine/WorkflowUtil.java | 10 +++ .../simple/workflow/engine/dag/port/InPort.java | 4 + .../workflow/engine/dag/port/InputPortIml.java | 16 +++- .../engine/parser/AiravataDefaultParser.java | 71 ++++++++++++--- 6 files changed, 169 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/55319c96/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java index e122fa6..3c2596d 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java @@ -49,6 +49,7 @@ import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowNode; import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode; import org.apache.ariavata.simple.workflow.engine.dag.port.InPort; import org.apache.ariavata.simple.workflow.engine.dag.port.OutPort; +import org.apache.ariavata.simple.workflow.engine.parser.AiravataDefaultParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +59,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; public class SimpleWorkflowInterpreter implements Runnable{ @@ -69,12 +71,18 @@ public class SimpleWorkflowInterpreter implements Runnable{ private String credentialToken; - private Map<String, WorkflowNode> readList = new HashMap<String, WorkflowNode>(); - private Map<String, WorkflowNode> waitingList = new HashMap<String, WorkflowNode>(); - private Map<String, ProcessPack> processingQueue = new HashMap<String, ProcessPack>(); + private Map<String, WorkflowNode> readList = new ConcurrentHashMap<String, WorkflowNode>(); + private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>(); + private Map<String, ProcessPack> processingQueue = new ConcurrentHashMap<String, ProcessPack>(); private Map<String, ProcessPack> completeList = new HashMap<String, ProcessPack>(); private Registry registry; private EventBus eventBus = new EventBus(); + private List<WorkflowOutputNode> completeWorkflowOutputs = new ArrayList<WorkflowOutputNode>(); + + public SimpleWorkflowInterpreter(String experimentId, String credentialToken) throws RegistryException { + setExperiment(experimentId); + this.credentialToken = credentialToken; + } public SimpleWorkflowInterpreter(Experiment experiment, String credentialStoreToken) { // read the workflow file and build the topology to a DAG. Then execute that dag @@ -87,11 +95,14 @@ public class SimpleWorkflowInterpreter implements Runnable{ public void launchWorkflow() throws Exception { // process workflow input nodes - WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance(); - WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken); +// WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance(); +// WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken); + WorkflowParser workflowParser = new AiravataDefaultParser(experiment, credentialToken); + log.debug("Initialized workflow parser"); setWorkflowInputNodes(workflowParser.parse()); + log.debug("Parsed the workflow and got the workflow input nodes"); processWorkflowInputNodes(getWorkflowInputNodes()); - processReadyList(); +// processReadyList(); // process workflow application nodes // process workflow output nodes } @@ -100,10 +111,17 @@ public class SimpleWorkflowInterpreter implements Runnable{ private synchronized void processReadyList() { for (WorkflowNode readyNode : readList.values()) { try { + if (readyNode instanceof WorkflowOutputNode) { + WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode; + completeWorkflowOutputs.add(wfOutputNode); + continue; + } WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode); TaskDetails process = getProcess(workflowNodeDetails); - addToProcessingQueue(new ProcessPack(readyNode, workflowNodeDetails, process)); - publishToProcessQueue(process); + ProcessPack processPack = new ProcessPack(readyNode, workflowNodeDetails, process); + addToProcessingQueue(processPack); +// publishToProcessQueue(process); + publishToProcessQueue(processPack); } catch (RegistryException e) { // FIXME : handle this exception } @@ -116,6 +134,41 @@ public class SimpleWorkflowInterpreter implements Runnable{ //TODO: publish to process queue. } + // TODO : remove this test method + private void publishToProcessQueue(ProcessPack process) { + WorkflowNode workflowNode = process.getWorkflowNode(); + if (workflowNode instanceof ApplicationNode) { + ApplicationNode applicationNode = (ApplicationNode) workflowNode; + List<InPort> inputPorts = applicationNode.getInputPorts(); + if (applicationNode.getNodeName().equals("Add")) { + applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf( + Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) + Integer.parseInt(inputPorts.get(1).getInputObject().getValue()))); + } else if (applicationNode.getNodeName().equals("Multiply")) { + applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf( + Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) * Integer.parseInt(inputPorts.get(1).getInputObject().getValue()))); + } else if (applicationNode.getNodeName().equals("Subtract")) { + applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf( + Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) - Integer.parseInt(inputPorts.get(1).getInputObject().getValue()))); + } else { + throw new RuntimeException("Invalid Application name"); + } + + for (Edge edge : applicationNode.getOutputPorts().get(0).getOutEdges()) { + WorkflowUtil.copyValues(applicationNode.getOutputPorts().get(0).getOutputObject(), edge.getToPort().getInputObject()); + if (edge.getToPort().getNode().isReady()) { + addToReadyQueue(edge.getToPort().getNode()); + } else { + addToWaitingQueue(edge.getToPort().getNode()); + } + } + } else if (workflowNode instanceof WorkflowOutputNode) { + WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) workflowNode; + throw new RuntimeException("Workflow output node in processing queue"); + } + + processingQueue.remove(process.getTaskDetails().getTaskID()); + } + private TaskDetails getProcess(WorkflowNodeDetails wfNodeDetails) throws RegistryException { // create workflow taskDetails from workflowNodeDetails TaskDetails taskDetails = ExperimentModelUtil.cloneTaskFromWorkflowNodeDetails(getExperiment(), wfNodeDetails); @@ -165,13 +218,16 @@ public class SimpleWorkflowInterpreter implements Runnable{ Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>(); for (WorkflowInputNode wfInputNode : wfInputNodes) { if (wfInputNode.isReady()) { + log.debug("Workflow node : " + wfInputNode.getNodeId() + " is ready to execute"); for (Edge edge : wfInputNode.getOutPort().getOutEdges()) { - edge.getToPort().setInputObject( - WorkflowUtil.copyValues(wfInputNode.getInputObject(), edge.getToPort().getInputObject())); + edge.getToPort().getInputObject().setValue(wfInputNode.getInputObject().getValue()); if (edge.getToPort().getNode().isReady()) { addToReadyQueue(edge.getToPort().getNode()); + log.debug("Added workflow node : " + edge.getToPort().getNode().getNodeId() + " to the readyQueue"); } else { addToWaitingQueue(edge.getToPort().getNode()); + log.debug("Added workflow node " + edge.getToPort().getNode().getNodeId() + " to the waitingQueue"); + } } } @@ -213,6 +269,8 @@ public class SimpleWorkflowInterpreter implements Runnable{ @Subscribe public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent){ String taskId = taskOutputEvent.getTaskIdentity().getTaskId(); + log.debug("Task Output changed event received for workflow node : " + + taskOutputEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId); ProcessPack processPack = processingQueue.get(taskId); Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>(); if (processPack != null) { @@ -236,6 +294,7 @@ public class SimpleWorkflowInterpreter implements Runnable{ } } processingQueue.remove(taskId); + log.debug("removed task from processing queue : " + taskId); } } @@ -257,12 +316,12 @@ public class SimpleWorkflowInterpreter implements Runnable{ case INPUT_DATA_STAGING: processPack.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING); break; - case OUTPUT_DATA_STAGING: - processPack.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING); - break; case EXECUTING: processPack.getWorkflowNode().setNodeState(NodeState.EXECUTING); break; + case OUTPUT_DATA_STAGING: + processPack.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING); + break; case POST_PROCESSING: processPack.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING); break; @@ -327,15 +386,22 @@ public class SimpleWorkflowInterpreter implements Runnable{ public void run() { // TODO: Auto generated method body. try { + log.debug("Launching workflow"); launchWorkflow(); while (!(waitingList.isEmpty() && readList.isEmpty())) { processReadyList(); + Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } } + private void setExperiment(String experimentId) throws RegistryException { + experiment = (Experiment) getRegistry().get(RegistryModelType.EXPERIMENT, experimentId); + log.debug("Retrieve Experiment for experiment id : " + experimentId); + } + class TempPublisher implements Runnable { private TaskDetails tempTaskDetails; http://git-wip-us.apache.org/repos/asf/airavata/blob/55319c96/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowFactoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowFactoryImpl.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowFactoryImpl.java index a6173ac..dd84df0 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowFactoryImpl.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowFactoryImpl.java @@ -33,13 +33,15 @@ public class WorkflowFactoryImpl implements WorkflowFactory { private WorkflowParser workflowParser; + private static final String synch = "sync"; + private WorkflowFactoryImpl(){ } public static WorkflowFactoryImpl getInstance() { if (workflowFactoryImpl == null) { - synchronized (workflowFactoryImpl) { + synchronized (synch) { if (workflowFactoryImpl == null) { workflowFactoryImpl = new WorkflowFactoryImpl(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/55319c96/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java index d4bbad3..688b170 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java @@ -41,6 +41,9 @@ public class WorkflowUtil { && !fromInputObj.getApplicationArgument().trim().equals("")) { toInputObj.setApplicationArgument(fromInputObj.getApplicationArgument()); } + if (toInputObj.getType() == null) { + toInputObj.setType(fromInputObj.getType()); + } return fromInputObj; } @@ -50,4 +53,11 @@ public class WorkflowUtil { } + public static OutputDataObjectType copyValues(InputDataObjectType inputObject, OutputDataObjectType outputObject) { + if (outputObject == null) { + outputObject = new OutputDataObjectType(); + } + outputObject.setValue(inputObject.getValue()); + return outputObject; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/55319c96/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InPort.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InPort.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InPort.java index c635bef..bac10ee 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InPort.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InPort.java @@ -34,4 +34,8 @@ public interface InPort extends Port { public void addEdge(Edge edge); + public String getDefaultValue(); + + public void setDefaultValue(String defaultValue); + } http://git-wip-us.apache.org/repos/asf/airavata/blob/55319c96/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InputPortIml.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InputPortIml.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InputPortIml.java index 1971a1d..82160a9 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InputPortIml.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InputPortIml.java @@ -26,10 +26,11 @@ import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowNode; public class InputPortIml implements InPort { private InputDataObjectType inputDataObjectType; - private boolean isSatisfy = false; + private boolean ready = false; private String portId; private Edge edge; private WorkflowNode node; + private String defaultValue; public InputPortIml(String portId) { this.portId = portId; @@ -38,6 +39,8 @@ public class InputPortIml implements InPort { @Override public void setInputObject(InputDataObjectType inputObject) { this.inputDataObjectType = inputObject; + ready = (inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals("")) + || !inputDataObjectType.isIsRequired(); } @Override @@ -56,8 +59,17 @@ public class InputPortIml implements InPort { } @Override + public String getDefaultValue() { + return defaultValue; + } + + public void setDefaultValue(String defaultValue) { + this.defaultValue = defaultValue; + } + + @Override public boolean isReady() { - return inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals(""); + return getInputObject() != null && inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals(""); } @Override http://git-wip-us.apache.org/repos/asf/airavata/blob/55319c96/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java ---------------------------------------------------------------------- diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java index 39e422a..e7ac5cb 100644 --- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java +++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java @@ -21,6 +21,7 @@ package org.apache.ariavata.simple.workflow.engine.parser; +import com.sun.corba.se.pept.encoding.OutputObject; import org.airavata.appcatalog.cpi.AppCatalogException; import org.airavata.appcatalog.cpi.WorkflowCatalog; import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; @@ -41,6 +42,7 @@ import org.apache.airavata.workflow.model.graph.GraphException; import org.apache.airavata.workflow.model.graph.Node; import org.apache.airavata.workflow.model.graph.impl.NodeImpl; import org.apache.airavata.workflow.model.graph.system.OutputNode; +import org.apache.airavata.workflow.model.graph.system.SystemDataPort; import org.apache.airavata.workflow.model.graph.ws.WSNode; import org.apache.airavata.workflow.model.graph.ws.WSPort; import org.apache.airavata.workflow.model.wf.Workflow; @@ -100,9 +102,6 @@ public class AiravataDefaultParser implements WorkflowParser { for (InputDataObjectType dataObjectType : experimentInputs) { inputDataMap.put(dataObjectType.getName(), dataObjectType); } - OutPort outPort = null; - InPort inPort = null; - Edge edge = null; for (Node gNode : gNodes) { wfInputNode = new WorkflowInputNodeImpl(gNode.getID(), gNode.getName()); wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getNodeName())); @@ -133,13 +132,12 @@ public class AiravataDefaultParser implements WorkflowParser { dataPort = portContainer.getDataPort(); inPort = portContainer.getInPort(); Node node = dataPort.getNode(); - inPort.setInputObject(getInputDataObject(dataPort)); +// inPort.setInputObject(getInputDataObject(dataPort)); if (node instanceof WSNode) { WSNode wsNode = (WSNode) node; WorkflowNode wfNode = wfNodes.get(wsNode.getID()); if (wfNode == null) { - wfApplicationNode = new ApplicationNodeImpl(wsNode.getID(), - wsNode.getComponent().getApplication().getApplicationId()); + wfApplicationNode = createApplicationNode(wsNode); wfNodes.put(wfApplicationNode.getNodeId(), wfApplicationNode); nextPortContainerList.addAll(processOutPorts(wsNode, wfApplicationNode)); } else if (wfNode instanceof ApplicationNode) { @@ -152,7 +150,7 @@ public class AiravataDefaultParser implements WorkflowParser { }else if (node instanceof OutputNode) { OutputNode oNode = (OutputNode) node; - wfOutputNode = new WorkflowOutputNodeImpl(oNode.getID(), oNode.getName()); + wfOutputNode = createWorkflowOutputNode(oNode); wfOutputNode.setInPort(inPort); wfNodes.put(wfOutputNode.getNodeId(), wfOutputNode); } @@ -161,18 +159,33 @@ public class AiravataDefaultParser implements WorkflowParser { } + private WorkflowOutputNode createWorkflowOutputNode(OutputNode oNode) { + WorkflowOutputNodeImpl workflowOutputNode = new WorkflowOutputNodeImpl(oNode.getID(), oNode.getName()); + OutputDataObjectType outputDataObjectType = new OutputDataObjectType(); + outputDataObjectType.setType(oNode.getParameterType()); + workflowOutputNode.setOutputObject(outputDataObjectType); + return workflowOutputNode; + } + + private ApplicationNode createApplicationNode(WSNode wsNode) { + ApplicationNode applicationNode = new ApplicationNodeImpl(wsNode.getID(), + wsNode.getComponent().getApplication().getApplicationId()); +// wsNode.getComponent().getInputPorts() + return applicationNode; + } + private List<PortContainer> processOutPorts(Node node, WorkflowNode wfNode) { OutPort outPort ; Edge edge; InPort inPort = null; List<PortContainer> portContainers = new ArrayList<PortContainer>(); for (DataPort dataPort : node.getOutputPorts()) { - outPort = new OutPortImpl(dataPort.getID()); + outPort = createOutPort(dataPort); for (DataEdge dataEdge : dataPort.getEdges()) { edge = new DirectedEdge(); edge.setFromPort(outPort); outPort.addEdge(edge); - inPort = getInPort(dataEdge.getToPort()); + inPort = createInPort(dataEdge.getToPort()); edge.setToPort(inPort); inPort.addEdge(edge); portContainers.add(new PortContainer(dataEdge.getToPort(), inPort)); @@ -181,7 +194,7 @@ public class AiravataDefaultParser implements WorkflowParser { if (wfNode instanceof WorkflowInputNode) { WorkflowInputNode workflowInputNode = (WorkflowInputNode) wfNode; workflowInputNode.setOutPort(outPort); - }else if (wfNode instanceof ApplicationNode) { + } else if (wfNode instanceof ApplicationNode) { ApplicationNode applicationNode = ((ApplicationNode) wfNode); applicationNode.addOutPort(outPort); // applicationNode.addInPort(inPort); @@ -190,8 +203,42 @@ public class AiravataDefaultParser implements WorkflowParser { return portContainers; } - private InPort getInPort(DataPort toPort) { - return new InputPortIml(toPort.getID()); + private OutPort createOutPort(DataPort dataPort) { + OutPortImpl outPort = new OutPortImpl(dataPort.getID()); + OutputDataObjectType outputDataObjectType = new OutputDataObjectType(); + if (dataPort instanceof WSPort) { + WSPort wsPort = (WSPort) dataPort; + outputDataObjectType.setName(wsPort.getFromNode().getName()); + outputDataObjectType.setType(wsPort.getType()); + }else if (dataPort instanceof SystemDataPort) { + SystemDataPort sysPort = (SystemDataPort) dataPort; + outputDataObjectType.setName(sysPort.getFromNode().getName()); + outputDataObjectType.setType(sysPort.getType()); + } + + outPort.setOutputObject(outputDataObjectType); + return outPort; + } + + private InPort createInPort(DataPort toPort) { + InPort inPort = new InputPortIml(toPort.getID()); + InputDataObjectType inputDataObjectType = new InputDataObjectType(); + if (toPort instanceof WSPort) { + WSPort wsPort = (WSPort) toPort; + inputDataObjectType.setName(wsPort.getName()); + inputDataObjectType.setType(wsPort.getType()); + inputDataObjectType.setApplicationArgument(wsPort.getComponentPort().getApplicationArgument()); + inputDataObjectType.setIsRequired(!wsPort.getComponentPort().isOptional()); + inputDataObjectType.setInputOrder(wsPort.getComponentPort().getInputOrder()); + + inPort.setDefaultValue(wsPort.getComponentPort().getDefaultValue()); + }else if (toPort instanceof SystemDataPort) { + SystemDataPort sysPort = (SystemDataPort) toPort; + inputDataObjectType.setName(sysPort.getName()); + inputDataObjectType.setType(sysPort.getType()); + } + inPort.setInputObject(inputDataObjectType); + return inPort; } private InputDataObjectType getInputDataObject(DataPort dataPort) {
