http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/EventProcessor.java ---------------------------------------------------------------------- diff --git a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/EventProcessor.java b/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/EventProcessor.java deleted file mode 100644 index 291da99..0000000 --- a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/EventProcessor.java +++ /dev/null @@ -1,1547 +0,0 @@ -/******************************************************************************* - * Copyright (C) 2007 The University of Manchester - * - * Modifications to the initial code base are copyright of their - * respective authors, or their employers as appropriate. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * as published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 - ******************************************************************************/ -package net.sf.taverna.t2.provenance.lineageservice; - -import static java.util.Collections.synchronizedList; -import static net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceProcessor.DATAFLOW_ACTIVITY; -import static net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceUtils.getDataItemAsXML; -import static net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceUtils.iterationToString; -import static net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceUtils.parentProcess; -import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.ACTIVITY_EVENT_TYPE; -import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.END_WORKFLOW_EVENT_TYPE; -import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.INVOCATION_STARTED_EVENT_TYPE; -import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.ITERATION_EVENT_TYPE; -import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.PROCESSOR_EVENT_TYPE; -import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.PROCESS_EVENT_TYPE; -import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.WORKFLOW_DATA_EVENT_TYPE; - -import java.beans.ExceptionListener; -import java.beans.XMLEncoder; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.StringReader; -import java.io.StringWriter; -import java.sql.Blob; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - -import javax.sql.rowset.serial.SerialBlob; - -import net.sf.taverna.t2.provenance.item.DataProvenanceItem; -import net.sf.taverna.t2.provenance.item.DataflowRunComplete; -import net.sf.taverna.t2.provenance.item.InputDataProvenanceItem; -import net.sf.taverna.t2.provenance.item.InvocationStartedProvenanceItem; -import net.sf.taverna.t2.provenance.item.IterationProvenanceItem; -import net.sf.taverna.t2.provenance.item.OutputDataProvenanceItem; -import net.sf.taverna.t2.provenance.item.ProvenanceItem; -import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem; -import net.sf.taverna.t2.provenance.lineageservice.utils.DataBinding; -import net.sf.taverna.t2.provenance.lineageservice.utils.DataLink; -import net.sf.taverna.t2.provenance.lineageservice.utils.NestedListNode; -import net.sf.taverna.t2.provenance.lineageservice.utils.Port; -import net.sf.taverna.t2.provenance.lineageservice.utils.PortBinding; -import net.sf.taverna.t2.provenance.lineageservice.utils.ProcessorBinding; -import net.sf.taverna.t2.provenance.lineageservice.utils.ProcessorEnactment; -import net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceProcessor; -import net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceUtils; -import net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary; -import net.sf.taverna.t2.reference.T2Reference; -import net.sf.taverna.t2.workflowmodel.Dataflow; -import net.sf.taverna.t2.workflowmodel.DataflowInputPort; -import net.sf.taverna.t2.workflowmodel.DataflowOutputPort; -import net.sf.taverna.t2.workflowmodel.Datalink; -import net.sf.taverna.t2.workflowmodel.InputPort; -import net.sf.taverna.t2.workflowmodel.MergeInputPort; -import net.sf.taverna.t2.workflowmodel.MergeOutputPort; -import net.sf.taverna.t2.workflowmodel.OutputPort; -import net.sf.taverna.t2.workflowmodel.Processor; -import net.sf.taverna.t2.workflowmodel.ProcessorInputPort; -import net.sf.taverna.t2.workflowmodel.ProcessorOutputPort; -import net.sf.taverna.t2.workflowmodel.processor.activity.Activity; -import net.sf.taverna.t2.workflowmodel.processor.activity.NestedDataflow; - - - - - - - - - - - - - - - - - - -//import org.apache.commons.io.FileUtils; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.jdom.Document; -import org.jdom.Element; -import org.jdom.JDOMException; -import org.jdom.Namespace; -import org.jdom.input.SAXBuilder; -import org.jdom.output.XMLOutputter; - -import uk.org.taverna.scufl2.api.io.WorkflowBundleIO; - -/** - * @author Paolo Missier - */ -public class EventProcessor { - /** - * A map of UUIDs of the originating processor to the ProcBinding object - * that contains its parameters - */ - private Map<String, ProcessorBinding> procBindingMap = new ConcurrentHashMap<>(); - - /** A map of child ids to their parents in the hierarchy of events: - * workflow -> process -> processor -> activity -> iteration - */ - private Map<String, String> parentChildMap= new ConcurrentHashMap<>(); - - private static Logger logger = Logger.getLogger(EventProcessor.class); - - private static final String OUTPUT_CONTAINER_PROCESSOR = "_OUTPUT_"; - private static final String INPUT_CONTAINER_PROCESSOR = "_INPUT_"; - - private volatile boolean workflowStructureDone = false; // used to inhibit processing of multiple workflow events -- we only need the first - private volatile String workflowRunId = null; // unique run ID. set when we see the first event of type "process" - - String topLevelDataflowName = null; - String topLevelDataflowID = null; - - Map<String, String> wfNestingMap = new ConcurrentHashMap<>(); - - // all input bindings are accumulated here so they can be "backpatched" (see backpatching() ) - List<PortBinding> allInputVarBindings = synchronizedList(new ArrayList<PortBinding>()); - - // dedicated class for processing WorkflowData events which carry workflow output info - private WorkflowDataProcessor wfdp; - private ProvenanceWriter pw = null; - private ProvenanceQuery pq = null; - - private HashMap<String, Port> mapping; - - private Map<String, ProcessorEnactment> processorEnactmentMap = new ConcurrentHashMap<>(); - - private Map<String, ProvenanceProcessor> processorMapById = new ConcurrentHashMap<>(); - - private WorkflowBundleIO io; - - // Backpatching temporarily disabled - private static final boolean backpatching = false; - - public EventProcessor(WorkflowBundleIO io) { - this.io = io; - } - - /** - * @param pw - * @throws SQLException - * @throws ClassNotFoundException - * @throws IllegalAccessException - * @throws InstantiationException - * - */ - public EventProcessor(ProvenanceWriter pw, ProvenanceQuery pq, - WorkflowDataProcessor wfdp,WorkflowBundleIO io) throws InstantiationException, - IllegalAccessException, ClassNotFoundException, SQLException { - this.pw = pw; - this.pq = pq; - this.wfdp = wfdp; - this.io = io; - - //logger.setLevel((Level) Level.INFO); - } - - /** - * this is the new version that makes use of the T2 deserializer - * populate static portion of the DB<br/> - * the static structure may already be in the DB -- this is detected as a duplicate top-level workflow ID. - * In this case, we skip this processing altogether - * @param content - * is a serialized dataflow (XML) -- this is parsed using the T2 - * Deserializer - * @return the workflowRunId for this workflow structure - */ - public String processWorkflowStructure(ProvenanceItem provenanceItem) { - /* - * this flag is set to prevent processing of separate - * workflowProvenanceItems that describe nested workflows. the - * processing of all nested workflows is done as part of the very first - * workflowProvenanceItem that we receive, which is self-consistent. so - * we ignore all others - */ - if (workflowStructureDone) - return null; - WorkflowProvenanceItem wpi = (WorkflowProvenanceItem) provenanceItem; - setWorkflowRunId(wpi.getIdentifier()); - workflowStructureDone = true; - return processWorkflowStructure(wpi.getDataflow()); - } - - public String processWorkflowStructure(Dataflow df) { - topLevelDataflowName = df.getLocalName(); - topLevelDataflowID = df.getIdentifier(); - - // check whether we already have this WF in the DB - List<String> workflowIds = null; - try { - workflowIds = pq.getAllworkflowIds(); - } catch (SQLException e) { - logger.warn("Problem processing workflow structure", e); - } - - if (workflowIds == null || workflowIds.contains(topLevelDataflowID)) { - // not already in the DB - logger.info("new workflow structure with ID " + topLevelDataflowID); - ProvenanceProcessor provProc = new ProvenanceProcessor(); - provProc.setIdentifier(UUID.randomUUID().toString()); - provProc.setProcessorName(topLevelDataflowName); - provProc.setFirstActivityClassName(DATAFLOW_ACTIVITY); - provProc.setWorkflowId(topLevelDataflowID); - provProc.setTopLevelProcessor(true); - // record the top level dataflow as a processor in the DB - try { - pw.addProcessor(provProc); - // pw.addProcessor(topLevelDataflowName, DATAFLOW_PROCESSOR_TYPE, topLevelDataflowID, true); // true -> is top level - } catch (SQLException e) { - logger.warn("Can't add processor " + topLevelDataflowID, e); - } - } - - return processDataflowStructure(df, topLevelDataflowID, df.getLocalName()); // null: no external name given to top level dataflow - } - - private Blob serialize(Dataflow df) { - Element serializeDataflow = null;xmlSerializer.serializeDataflow(df);//FIXME - String dataflowString = null; - try { - XMLOutputter outputter = new XMLOutputter(); - StringWriter stringWriter = new StringWriter(); - outputter.output(serializeDataflow, stringWriter); - dataflowString = stringWriter.toString(); - } catch (java.io.IOException e) { - logger.error("Could not serialise dataflow", e); - // FIXME Bad Exception handling! - } - return new SerialBlob(dataflowString.getBytes("UTF-8")); - } - - /** - * note: this method can be called as part of a recursion on sub-workflows - * - * @param df - * @param dataflowID - * the UUID for the entire dataflow (may be a sub-dataflow) - * @param localName - * the external name of the dataflow. Null if this is top level, - * not null if a sub-dataflow - * @return the workflowRunId for this workflow structure - */ - private String processDataflowStructure(Dataflow df, String dataflowID, String externalName) { - String localWorkflowRunID = getWorkflowRunId(); - - //dataflowDepth++; - - try { - // check whether we already have this WF in the DB - boolean alreadyInDb; - try { - List<String> workflowIds = pq.getAllworkflowIds(); - alreadyInDb = workflowIds != null && workflowIds.contains(dataflowID); - } catch (SQLException e) { - logger.warn("Problem processing dataflow structure for " + dataflowID, e); - alreadyInDb = false; - } - - // add workflow ID -- this is NOT THE SAME AS the workflowRunId - - /* - * this could be a nested workflow -- in this case, override its - * workflowRunId with that of its parent - */ - if (!alreadyInDb) { - String parentDataflow = wfNestingMap.get(dataflowID); - Blob blob = serialize(df); - if (parentDataflow == null) { - // this is a top level dataflow description - pw.addWFId(dataflowID, null, externalName, blob); // set its dataflowID with no parent - - } else { - // we are processing a nested workflow structure - logger.debug("dataflow "+dataflowID+" with external name "+externalName+" is nested within "+parentDataflow); - - pw.addWFId(dataflowID, parentDataflow, externalName, blob); // set its dataflowID along with its parent - - // override workflowRunId to point to top level -- UNCOMMENTED PM 9/09 CHECK - localWorkflowRunID = pq.getRuns(parentDataflow, null).get(0).getWorkflowRunId(); - } - } - // Log the run itself - pw.addWorkflowRun(dataflowID, localWorkflowRunID); - - // add processors along with their variables - List<Port> vars = new ArrayList<Port>(); - for (Processor p : df.getProcessors()) { - String pName = p.getLocalName(); - - //CHECK get type of first activity and set this as the type of the processor itself - List<? extends Activity<?>> activities = p.getActivityList(); - - if (! alreadyInDb) { - ProvenanceProcessor provProc; - String pType = null; - if (activities != null && !activities.isEmpty()) - pType = activities.get(0).getClass().getCanonicalName(); - provProc = new ProvenanceProcessor(); - provProc.setIdentifier(UUID.randomUUID().toString()); - provProc.setProcessorName(pName); - provProc.setFirstActivityClassName(pType); - provProc.setWorkflowId(dataflowID); - provProc.setTopLevelProcessor(false); - - pw.addProcessor(provProc); - - //pw.addProcessor(pName, pType, dataflowID, false); // false: not a top level processor - - /* - * add all input ports for this processor as input variables - */ - for (ProcessorInputPort ip : p.getInputPorts()) { - Port inputVar = new Port(); - inputVar.setIdentifier(UUID.randomUUID().toString()); - inputVar.setProcessorId(provProc.getIdentifier()); - inputVar.setProcessorName(pName); - inputVar.setWorkflowId(dataflowID); - inputVar.setPortName(ip.getName()); - inputVar.setDepth(ip.getDepth()); - inputVar.setInputPort(true); - vars.add(inputVar); - } - - /* - * add all output ports for this processor as output - * variables - */ - for (ProcessorOutputPort op : p.getOutputPorts()) { - Port outputVar = new Port(); - outputVar.setIdentifier(UUID.randomUUID().toString()); - outputVar.setProcessorName(pName); - outputVar.setProcessorId(provProc.getIdentifier()); - outputVar.setWorkflowId(dataflowID); - outputVar.setPortName(op.getName()); - outputVar.setDepth(op.getDepth()); - outputVar.setInputPort(false); - - vars.add(outputVar); - } - } - - /* - * check for nested structures: if the activity is - * DataflowActivity then this processor is a nested workflow; - * make an entry into wfNesting map with its ID and recurse on - * the nested workflow - */ - - if (activities != null) - for (Activity<?> a : activities) { - if (!(a instanceof NestedDataflow)) - continue; - - Dataflow nested = ((NestedDataflow) a) - .getNestedDataflow(); - wfNestingMap.put(nested.getIdentifier(), dataflowID); // child -> parent - - // RECURSIVE CALL - processDataflowStructure(nested, - nested.getIdentifier(), p.getLocalName()); - } - } // end for each processor - - // add inputs to entire dataflow - String pName = INPUT_CONTAINER_PROCESSOR; // overridden -- see below - - /* - * check whether we are processing a nested workflow. in this case - * the input vars are not assigned to the INPUT processor but to the - * containing dataflow - */ - if (! alreadyInDb) { - if (externalName != null) // override the default if we are nested or someone external name is provided - pName = externalName; - - for (DataflowInputPort ip : df.getInputPorts()) { - Port inputVar = new Port(); - inputVar.setIdentifier(UUID.randomUUID().toString()); - inputVar.setProcessorId(null); // meaning workflow port - inputVar.setProcessorName(pName); - inputVar.setWorkflowId(dataflowID); - inputVar.setPortName(ip.getName()); - inputVar.setDepth(ip.getDepth()); - inputVar.setInputPort(true); // CHECK PM modified 11/08 -- input vars are actually outputs of input processors... - - vars.add(inputVar); - } - - // add outputs of entire dataflow - pName = OUTPUT_CONTAINER_PROCESSOR; // overridden -- see below - - /* - * check whether we are processing a nested workflow. in this - * case the output vars are not assigned to the OUTPUT processor - * but to the containing dataflow - */ - if (externalName != null) // we are nested - pName = externalName; - - for (DataflowOutputPort op : df.getOutputPorts()) { - Port outputVar = new Port(); - outputVar.setIdentifier(UUID.randomUUID().toString()); - outputVar.setProcessorId(null); // meaning workflow port - outputVar.setProcessorName(pName); - outputVar.setWorkflowId(dataflowID); - outputVar.setPortName(op.getName()); - outputVar.setDepth(op.getDepth()); - outputVar.setInputPort(false); // CHECK PM modified 11/08 -- output vars are actually outputs of output processors... - vars.add(outputVar); - } - - pw.addPorts(vars, dataflowID); - makePortMapping(vars); - - /* - * add datalink records using the dataflow links retrieving the - * processor names requires navigating from links to source/sink - * and from there to the processors - */ - for (Datalink l : df.getLinks()) { - // TODO cover the case of datalinks from an input and to an output to the entire dataflow - - Port sourcePort = null; - Port destinationPort = null; - - OutputPort source = l.getSource(); - if (source instanceof ProcessorOutputPort) { - String sourcePname = ((ProcessorOutputPort) source) - .getProcessor().getLocalName(); - sourcePort = lookupPort(sourcePname, source.getName(), false); - } else if (source instanceof MergeOutputPort) { - // TODO: Handle merge output ports - } else - // Assume it is internal port from DataflowInputPort - sourcePort = lookupPort(externalName, source.getName(), true); - - InputPort sink = l.getSink(); - if (sink instanceof ProcessorInputPort) { - String sinkPname = ((ProcessorInputPort) sink) - .getProcessor().getLocalName(); - destinationPort = lookupPort(sinkPname, sink.getName(), true); - } else if (sink instanceof MergeInputPort) { - // TODO: Handle merge input ports - } else - // Assume it is internal port from DataflowOutputPort - destinationPort = lookupPort(externalName, sink.getName(), false); - - if (sourcePort != null && destinationPort != null) - pw.addDataLink(sourcePort, destinationPort, dataflowID); - else - logger.info("Can't record datalink " + l); - } - } - } catch (Exception e) { - logger.error("Problem processing provenance for dataflow", e); - } - - return dataflowID; - } - - private void makePortMapping(List<Port> ports) { - mapping = new HashMap<>(); - for (Port port: ports) { - String key = port.getProcessorName() - + (port.isInputPort() ? "/i:" : "/o:") + port.getPortName(); - mapping.put(key, port); - } - } - - private Port lookupPort(String processorName, String portName, boolean isInputPort) { - String key = processorName + (isInputPort ? "/i:" : "/o:") + portName; - return mapping.get(key); - } - - /** - * processes an elementary process execution event from T2. Collects info - * from events as they happen and sends them to the writer for processing - * when the iteration event is received. Uses the map of procBindings to - * process event id and the map of child ids to parent ids to ensure that - * the correct proc binding is used - * @param currentWorkflowID - * - * @param d - * @param context - */ - public void processProcessEvent(ProvenanceItem provenanceItem, String currentWorkflowID) { - switch (provenanceItem.getEventType()) { - case PROCESS_EVENT_TYPE: { - String parentId = provenanceItem.getParentId(); // this is the workflowID - String identifier = provenanceItem.getIdentifier(); // use this as workflowRunId if this is the top-level process - - parentChildMap.put(identifier, parentId); - ProcessorBinding pb = new ProcessorBinding(); - pb.setWorkflowRunId(getWorkflowRunId()); - pb.setWorkflowId(currentWorkflowID); - procBindingMap.put(identifier, pb); - return; - } - case PROCESSOR_EVENT_TYPE: { - String identifier = provenanceItem.getIdentifier(); - String parentId = provenanceItem.getParentId(); - String processID = provenanceItem.getProcessId(); // this is the external process ID - - // this has the weird form facade0:dataflowname:pname need to extract pname from here - String[] processName = processID.split(":"); - procBindingMap.get(parentId).setProcessorName( - processName[processName.length - 1]); - // 3rd component of composite name - - parentChildMap.put(identifier, parentId); - return; - } - case ACTIVITY_EVENT_TYPE: { - String identifier = provenanceItem.getIdentifier(); - String parentId = provenanceItem.getParentId(); - procBindingMap.get(parentChildMap.get(parentId)) - .setFirstActivityClassName(identifier); - parentChildMap.put(identifier, parentId); - return; - } - case ITERATION_EVENT_TYPE: { - IterationProvenanceItem iterationProvenanceItem = (IterationProvenanceItem)provenanceItem; - if (iterationProvenanceItem.getParentIterationItem() != null) - // Skipping pipelined outputs, we'll process the parent output later instead - return; - - // traverse up to root to retrieve ProcBinding that was created when we saw the process event - String activityID = provenanceItem.getParentId(); - String processorID = parentChildMap.get(activityID); - String processID = parentChildMap.get(processorID); - String iterationID = provenanceItem.getIdentifier(); - parentChildMap.put(iterationID, activityID); - - ProcessorEnactment processorEnactment = processorEnactmentMap - .get(iterationID); - if (processorEnactment == null) - processorEnactment = new ProcessorEnactment(); - - ProcessorBinding procBinding = procBindingMap.get(processID); - - String itVector = extractIterationVector(iterationToString(iterationProvenanceItem - .getIteration())); - procBinding.setIterationVector(itVector); - - processorEnactment.setEnactmentStarted(iterationProvenanceItem - .getEnactmentStarted()); - processorEnactment.setEnactmentEnded(iterationProvenanceItem - .getEnactmentEnded()); - processorEnactment.setWorkflowRunId(workflowRunId); - processorEnactment.setIteration(itVector); - - String processId = iterationProvenanceItem.getProcessId(); - String parentProcessId = parentProcess(processId, 3); - if (parentProcessId != null) { - ProcessorEnactment parentProcEnact = getWfdp().invocationProcessToProcessEnactment - .get(parentProcessId); - if (parentProcEnact != null) - processorEnactment - .setParentProcessorEnactmentId(parentProcEnact - .getProcessEnactmentId()); - } - processorEnactment.setProcessEnactmentId(iterationProvenanceItem - .getIdentifier()); - processorEnactment.setProcessIdentifier(processId); - - ProvenanceProcessor provenanceProcessor; - if (processorEnactment.getProcessorId() == null) { - provenanceProcessor = pq.getProvenanceProcessorByName( - currentWorkflowID, procBinding.getProcessorName()); - if (provenanceProcessor == null) - // already logged warning - return; - processorMapById.put(provenanceProcessor.getIdentifier(), - provenanceProcessor); - processorEnactment.setProcessorId(provenanceProcessor - .getIdentifier()); - } else { - provenanceProcessor = processorMapById.get(processorEnactment - .getProcessorId()); - if (provenanceProcessor == null) { - provenanceProcessor = pq - .getProvenanceProcessorById(processorEnactment - .getProcessorId()); - processorMapById.put(provenanceProcessor.getIdentifier(), - provenanceProcessor); - } - } - - InputDataProvenanceItem inputDataEl = iterationProvenanceItem.getInputDataItem(); - OutputDataProvenanceItem outputDataEl = iterationProvenanceItem.getOutputDataItem(); - - if (inputDataEl != null - && processorEnactment.getInitialInputsDataBindingId() == null) { - processorEnactment - .setInitialInputsDataBindingId(processDataBindings( - inputDataEl, provenanceProcessor)); - processInput(inputDataEl, procBinding, currentWorkflowID); - } - - if (outputDataEl != null - && processorEnactment.getFinalOutputsDataBindingId() == null) { - processorEnactment - .setFinalOutputsDataBindingId(processDataBindings( - outputDataEl, provenanceProcessor)); - processOutput(outputDataEl, procBinding, currentWorkflowID); - } - - try { - if (processorEnactmentMap.containsKey(iterationID)) { - getPw().updateProcessorEnactment(processorEnactment); - } else { - getPw().addProcessorEnactment(processorEnactment); - processorEnactmentMap.put(iterationID, processorEnactment); - } - } catch (SQLException e) { - logger.warn("Could not store processor enactment", e); - } - return; - } - case END_WORKFLOW_EVENT_TYPE: { - DataflowRunComplete completeEvent = (DataflowRunComplete) provenanceItem; - // use this event to do housekeeping on the input/output varbindings - - // process the input and output values accumulated by WorkflowDataProcessor - getWfdp().processTrees(completeEvent, getWorkflowRunId()); - - reconcileLocalOutputs(provenanceItem.getWorkflowId()); - - if (! provenanceItem.getProcessId().contains(":")) { - // Top-level workflow finished - // No longer needed, done by processTrees() -// patchTopLevelnputs(); - - workflowStructureDone = false; // CHECK reset for next run... -// reconcileTopLevelOutputs(); // Done by reconcileLocalOutputs - getPw().closeCurrentModel(); // only real impl is for RDF - } - return; - } - case WORKFLOW_DATA_EVENT_TYPE: { - // give this event to a WorkflowDataProcessor object for pre-processing - // try { - // TODO may generate an exception when the data is an error CHECK - getWfdp().addWorkflowDataItem(provenanceItem); - // } catch (NumberFormatException e) { - // logger.error(e); - // } - // logger.info("Received workflow data - not processing"); - //FIXME not sure - needs to be stored somehow - return; - } - case INVOCATION_STARTED_EVENT_TYPE: { - InvocationStartedProvenanceItem startedItem = (InvocationStartedProvenanceItem) provenanceItem; - ProcessorEnactment processorEnactment = processorEnactmentMap - .get(startedItem.getParentId()); - if (processorEnactment == null) { - logger.error("Could not find ProcessorEnactment for invocation " - + startedItem); - return; - } - getWfdp().invocationProcessToProcessEnactment.put( - startedItem.getInvocationProcessId(), processorEnactment); - return; - } - case ERROR_EVENT_TYPE: - //TODO process the error - return; - default: - // TODO broken, should we throw something here? - return; - } - } - - private String processDataBindings( - DataProvenanceItem provenanceItem, ProvenanceProcessor provenanceProcessor) { - // TODO: Cache known provenaneItems and avoid registering again - String dataBindingId = UUID.randomUUID().toString(); - boolean isInput = provenanceItem instanceof InputDataProvenanceItem; - - for (Entry<String, T2Reference> entry : provenanceItem.getDataMap().entrySet()) { - DataBinding dataBinding = new DataBinding(); - dataBinding.setDataBindingId(dataBindingId); - Port port = findPort(provenanceProcessor, entry.getKey(), isInput); // findPort - if (port == null) { - logger.warn("Could not find port for " + entry.getKey()); - continue; - } - dataBinding.setPort(port); - dataBinding.setT2Reference(entry.getValue().toUri().toASCIIString()); - dataBinding.setWorkflowRunId(workflowRunId); - try { - getPw().addDataBinding(dataBinding); - } catch (SQLException e) { - logger.warn("Could not register data binding for " + port, e); - } - } - return dataBindingId; - } - - private Port findPort(ProvenanceProcessor provenanceProcessor, - String portName, boolean isInput) { - // TODO: Query pr dataflow and cache - Map<String, String> queryConstraints = new HashMap<>(); - queryConstraints.put("V.workflowId", - provenanceProcessor.getWorkflowId()); - String processorName = provenanceProcessor.getProcessorName(); - queryConstraints.put("processorName", processorName); - queryConstraints.put("portName", portName); - queryConstraints.put("isInputPort", isInput ? "1" : "0"); - try { - List<Port> vars = pq.getPorts(queryConstraints); - if (vars.isEmpty()) { - logger.warn("Can't find port " + portName + " in " - + processorName); - } else if (vars.size() > 1) { - logger.warn("Multiple matches for port " + portName + " in " - + processorName + ", got:" + vars); - } else - return vars.get(0); - } catch (SQLException e) { - logger.error( - "Problem getting ports for processor: " + processorName - + " worflow: " - + provenanceProcessor.getWorkflowId(), e); - } - return null; - } - - - /** - * fills in the VBs for the global inputs -- this removes the need for explicit events - * that account for these value bindings... - */ - public void patchTopLevelnputs() { - - // for each input I to topLevelDataflow: - // pick first outgoing datalink with sink P:X - // copy value X to I -- this can be a collection, so copy everything - - // get all global input vars - - // logger.info("\n\n BACKPATCHING GLOBAL INPUTS with dataflowDepth = "+dataflowDepth+"*******\n"); - - List<Port> inputs=null; - try { - inputs = getPq().getInputPorts(topLevelDataflowName, topLevelDataflowID); - - for (Port input:inputs) { - - // logger.info("global input: "+input.getVName()); - - Map<String,String> queryConstraints = new HashMap<String,String>(); - -// queryConstraints.put("sourcePortName", input.getVName()); -// queryConstraints.put("sourceProcessorName", input.getPName()); - queryConstraints.put("sourcePortId", input.getIdentifier()); - queryConstraints.put("workflowId", input.getWorkflowId()); - List<DataLink> outgoingDataLinks = getPq().getDataLinks(queryConstraints); - - // any datalink will do, use the first - String targetPname = outgoingDataLinks.get(0).getDestinationProcessorName(); - String targetVname = outgoingDataLinks.get(0).getDestinationPortName(); - -// logger.info("copying values from ["+targetPname+":"+targetVname+"] for instance ID: ["+workflowRunId+"]"); - - queryConstraints.clear(); - queryConstraints.put("V.portName", targetVname); - queryConstraints.put("V.processorName", targetPname); - queryConstraints.put("VB.workflowRunId", getWorkflowRunId()); - queryConstraints.put("V.workflowId", topLevelDataflowID); - - for (PortBinding vb : getPq().getPortBindings(queryConstraints)) { - PortBinding inputPortBinding = new PortBinding(vb); - - // insert PortBinding back into VB with the global input portName - inputPortBinding.setProcessorName(input.getProcessorName()); - inputPortBinding.setPortName(input.getPortName()); - try { - getPw().addPortBinding(inputPortBinding); - } catch (SQLException ex) { - logger.info("Already logged port binding", ex); - } - } - } - } catch (SQLException e) { - logger.warn("Patch top level inputs problem for provenance", e); - } catch (IndexOutOfBoundsException e) { - logger.error("Could not patch top level", e); - } - } - - public void reconcileTopLevelOutputs() { - reconcileLocalOutputs(topLevelDataflowID); - } - - // PM added 23/4/09 - /** - * reconcile the top level outputs with the results from its immediate precedessors in the graph.<br/> - * various cases have to be considered: predecessors may include records that are not in the output, - * while the output may include nested list structures that are not in the precedessors. This method accounts - * for a 2-way reconciliation that considers all possible cases.<br/> - * at the end, outputs and their predecessors contain the same data.<p/> - * NOTE: if we assume that data values (URIs) are <em>always</em> unique then this is greatly simplified by just - * comparing two sets of value records by their URIs and reconciling them. But this is not the way it is done here - */ - public void reconcileLocalOutputs(String dataflowID) { - /* - for each output O - - for each variable V in predecessors(O) - - fetch all VB records for O into list OValues - fetch all VB records for V into list Yalues - - compare OValues and VValues: - it SHOULD be the case that OValues is a subset of YValues. Under this assumption: - - for each vb in YValues: - - if there is a matching o in OValues then (vb may be missing collection information) - copy o to vb - else - if vb has no collection info && there is a matching tree node tn in OTree (use iteration index for the match) then - set vb to be in collection tb - copy vb to o - - finally copy all Collection records for O in OTree -- catch duplicate errors - */ - - Map<String, String> queryConstraints = new HashMap<>(); - - try { - // for each output O - for (Port output:pq.getOutputPorts(topLevelDataflowName, topLevelDataflowID)) { - // collect all VBs for O -// String oPName = output.getPName(); -// String oVName = output.getVName(); -// queryConstraints.put("V.portName", oVName); -// queryConstraints.put("V.processorName", oPName); -// queryConstraints.put("VB.workflowRunId", workflowRunId); -// queryConstraints.put("V.workflowId", topLevelDataflowID); - -// List<PortBinding> OValues = pq.getPortBindings(queryConstraints); - - // find all records for the immediate precedessor Y of O - queryConstraints.clear(); -// queryConstraints.put("destinationPortName", output.getVName()); -// queryConstraints.put("destinationProcessorName", output.getPName()); - queryConstraints.put("destinationPortId", output.getIdentifier()); - queryConstraints.put("workflowId", output.getWorkflowId()); - List<DataLink> incomingDataLinks = pq.getDataLinks(queryConstraints); - - // there can be only one -- but check that there is one! - if (incomingDataLinks.isEmpty()) - continue; - - String sourcePname = incomingDataLinks.get(0).getSourceProcessorName(); - String sourceVname = incomingDataLinks.get(0).getSourcePortName(); - - queryConstraints.clear(); - queryConstraints.put("V.portName", sourceVname); - queryConstraints.put("V.processorName", sourcePname); - queryConstraints.put("VB.workflowRunId", getWorkflowRunId()); - queryConstraints.put("V.workflowId", topLevelDataflowID); - - List<PortBinding> YValues = pq.getPortBindings(queryConstraints); - - // for each YValue look for a match in OValues - // (assume the YValues values are a superset of OValues)!) - - for (PortBinding yValue:YValues) { - // look for a matching record in PortBinding for output O - queryConstraints.clear(); - queryConstraints.put("V.portName", output.getPortName()); - queryConstraints.put("V.processorName", output.getProcessorName()); - queryConstraints.put("VB.workflowRunId", getWorkflowRunId()); - queryConstraints.put("V.workflowid", topLevelDataflowID); - queryConstraints.put("VB.iteration", yValue.getIteration()); - if (yValue.getCollIDRef()!= null) { - queryConstraints.put("VB.collIDRef", yValue.getCollIDRef()); - queryConstraints.put("VB.positionInColl", Integer.toString(yValue.getPositionInColl())); - } - List<PortBinding> matchingOValues = pq.getPortBindings(queryConstraints); - - // result at most size 1 - if (!matchingOValues.isEmpty()) { - PortBinding oValue = matchingOValues.get(0); - - // copy collection info from oValue to yValue - yValue.setCollIDRef(oValue.getCollIDRef()); - yValue.setPositionInColl(oValue.getPositionInColl()); - - pw.updatePortBinding(yValue); - } else { - // copy the yValue to O - // insert PortBinding back into VB with the global output portName - yValue.setProcessorName(output.getProcessorName()); - yValue.setPortName(output.getPortName()); - pw.addPortBinding(yValue); - } - - } // for each yValue in YValues - - // copy all Collection records for O to Y - - // get all collections refs for O - queryConstraints.clear(); - queryConstraints.put("workflowRunId", getWorkflowRunId()); - queryConstraints.put("processorNameRef", output.getProcessorName()); - queryConstraints.put("portName", output.getPortName()); - - List<NestedListNode> oCollections = pq.getNestedListNodes(queryConstraints); - - // insert back as collection refs for Y -- catch duplicates - for (NestedListNode nln:oCollections) { - nln.setProcessorName(sourcePname); - nln.setProcessorName(sourceVname); - - getPw().replaceCollectionRecord(nln, sourcePname, sourceVname); - } - - } // for each output var - - } catch (SQLException e) { - logger.warn("Problem reconciling top level outputs", e); - } - - } - - @SuppressWarnings("unchecked") - private void processOutput(OutputDataProvenanceItem provenanceItem, - ProcessorBinding procBinding, String currentWorkflowID) { - Element dataItemAsXML = getDataItemAsXML(provenanceItem); - List<Element> outputPorts = dataItemAsXML.getChildren("port"); - for (Element outputport : outputPorts) { - String portName = outputport.getAttributeValue("name"); - - // value type may vary - List<Element> valueElements = outputport.getChildren(); - if (valueElements != null && !valueElements.isEmpty()) { - Element valueEl = valueElements.get(0); // only really 1 child - - processPortBinding(valueEl, procBinding.getProcessorName(), - portName, procBinding.getIterationVector(), - getWorkflowRunId(), currentWorkflowID); - } - } - } - - /** - * this method reconciles values in varBindings across an datalink: Firstly, - * if vb's value is within a collection, _and_ it is copied from a value - * generated during a previous iteration, then this method propagates the - * list reference to that iteration value, which wouldn't have it. - * Conversely, if vb is going to be input to an iteration, then it's lost - * its containing list node, and we put it back in by looking at the - * corresponding predecessor - * - * @param vb - * @throws SQLException - */ - private void backpatchIterationResults(List<PortBinding> newBindings) throws SQLException { - logger.debug("backpatchIterationResults: start"); - for (PortBinding vb : newBindings) { - logger.debug("backpatchIterationResults: processing vb " - + vb.getProcessorName() + "/" + vb.getPortName() + "=" - + vb.getValue()); - - if (vb.getCollIDRef()!= null) // this is a member of a collection - logger.debug("...which is inside a collection "); - - // look for its antecedent - Map<String,String> queryConstraints = new HashMap<>(); - queryConstraints.put("destinationPortName", vb.getPortName()); - queryConstraints.put("destinationProcessorName", vb.getProcessorName()); - queryConstraints.put("workflowId", pq.getWorkflowIdsForRun(vb.getWorkflowRunId()).get(0)); // CHECK picking first element in list... - List<DataLink> incomingDataLinks = pq.getDataLinks(queryConstraints); - - // there can be only one -- but check that there is one! - if (incomingDataLinks.isEmpty()) - return; - - String sourcePname = incomingDataLinks.get(0).getSourceProcessorName(); - String sourceVname = incomingDataLinks.get(0).getSourcePortName(); - - logger.debug("antecedent: "+sourcePname+":"+sourceVname); - - // get the varbindings for this port and select the one with the same iteration vector as its successor - queryConstraints.clear(); - queryConstraints.put("VB.portName", sourceVname); - queryConstraints.put("V.processorName", sourcePname); - queryConstraints.put("VB.value", vb.getValue()); - queryConstraints.put("VB.workflowRunId", vb.getWorkflowRunId()); - - // reconcile - for (PortBinding b : pq.getPortBindings(queryConstraints)) { - logger.debug("backpatching " + sourceVname + " " + sourcePname); - - if (vb.getCollIDRef() != null && b.getCollIDRef() == null) { - logger.debug("successor " + vb.getPortName() - + " is in collection " + vb.getCollIDRef() - + " but pred " + b.getPortName() + " is not"); - logger.debug("putting " + b.getPortName() - + " in collection " + vb.getCollIDRef() - + " at pos " + vb.getPositionInColl()); - b.setCollIDRef(vb.getCollIDRef()); - b.setPositionInColl(vb.getPositionInColl()); - getPw().updatePortBinding(b); - - } else if (vb.getCollIDRef() == null && b.getCollIDRef() != null) { - logger.debug("successor " + vb.getPortName() - + " is NOT in collection but pred " - + b.getPortName() + " IS"); - logger.debug("putting " + vb.getPortName() - + " in collection " + b.getCollIDRef() + " at pos " - + b.getPositionInColl()); - vb.setCollIDRef(b.getCollIDRef()); - vb.setPositionInColl(b.getPositionInColl()); - getPw().updatePortBinding(vb); - } - } - } - } - - - /** - * create one new PortBinding record for each input port binding - * @param currentWorkflowID - */ - @SuppressWarnings("unchecked") - private void processInput(InputDataProvenanceItem provenanceItem, - ProcessorBinding procBinding, String currentWorkflowID) { - Element dataItemAsXML = getDataItemAsXML(provenanceItem); - int order = 0; - for (Element inputport : (List<Element>) dataItemAsXML.getChildren("port")) { - String portName = inputport.getAttributeValue("name"); - - try { - // add process order sequence to Port for this portName - Map<String, String> queryConstraints = new HashMap<>(); - queryConstraints.put("V.workflowId", currentWorkflowID); - queryConstraints.put("processorName", procBinding.getProcessorName()); - queryConstraints.put("portName", portName); - queryConstraints.put("isInputPort", "1"); - - Port v = getPq().getPorts(queryConstraints).get(0); - v.setIterationStrategyOrder(order++); - getPw().updatePort(v); - } catch (IndexOutOfBoundsException e) { - logger.error("Could not process input " + portName, e); - } catch (SQLException e1) { - logger.error("Could not process input " + portName, e1); - } - - // value type may vary - List<Element> valueElements = inputport.getChildren(); // hopefully - // in the right order... - if (valueElements != null && valueElements.size() > 0) { - Element valueEl = valueElements.get(0); // expect only 1 child - // processVarBinding(valueEl, processor, portName, iterationVector, - // dataflow); - - List<PortBinding> newBindings = processPortBinding(valueEl, - procBinding.getProcessorName(), portName, - procBinding.getIterationVector(), getWorkflowRunId(), - currentWorkflowID); - // this is a list whenever valueEl is of type list: in this case processVarBinding recursively - // processes all values within the collection, and generates one PortBinding record for each of them - - allInputVarBindings.addAll(newBindings); - - // // if the new binding involves list values, then check to see if they need to be propagated back to -// // results of iterations - - // Backpatching disabled as it is very inefficient and not needed - // for current Taverna usage - - try { - if (backpatching) - backpatchIterationResults(newBindings); - } catch (SQLException e) { - logger.warn("Problem with back patching iteration results", e); - } - } else { - if (valueElements != null) - logger.debug("port name " + portName + " " - + valueElements.size()); - else - logger.debug("valueElements is null for port name " - + portName); - } - } - } - - /** - * capture the default case where the value is not a list - * - * @param valueEl - * @param processorId - * @param portName - * @param iterationId - * @param workflowRunId - * @param currentWorkflowID - */ - private List<PortBinding> processPortBinding(Element valueEl, - String processorId, String portName, String iterationId, - String workflowRunId, String currentWorkflowID) { - // uses the defaults: - // collIdRef = null - // parentcollectionRef = null - // positionInCollection = 1 - return processPortBinding(valueEl, processorId, portName, null, 1, null, - iterationId, workflowRunId, null, currentWorkflowID); - } - - /** - * general case where value can be a list - * @param valueEl - * @param processorId - * @param portName - * @param collIdRef - * @param positionInCollection - * @param parentCollectionRef - * @param iterationId - * @param workflowRunId - * @param currentWorkflowID - */ - @SuppressWarnings("unchecked") - private List<PortBinding> processPortBinding(Element valueEl, - String processorId, String portName, String collIdRef, - int positionInCollection, String parentCollectionRef, - String iterationId, String workflowRunId, String itVector, - String currentWorkflowID) { - List<PortBinding> newBindings = new ArrayList<>(); - - String valueType = valueEl.getName(); - // logger.info("value element for " + processorId + ": " - // + valueType); - - String iterationVector = (itVector == null ? extractIterationVector(iterationId) - : itVector); - - PortBinding vb = new PortBinding(); - - vb.setWorkflowId(currentWorkflowID); - vb.setWorkflowRunId(workflowRunId); - vb.setProcessorName(processorId); - vb.setValueType(valueType); - vb.setPortName(portName); - vb.setCollIDRef(collIdRef); - vb.setPositionInColl(positionInCollection); - - newBindings.add(vb); - - if (valueType.equals("literal")) { - try { - vb.setIteration(iterationVector); - vb.setValue(valueEl.getAttributeValue("id")); - logger.debug("new input VB with workflowId="+currentWorkflowID+" processorId="+processorId+ - " valueType="+valueType+" portName="+portName+" collIdRef="+collIdRef+ - " position="+positionInCollection+" itvector="+iterationVector+ - " value="+vb.getValue()); - getPw().addPortBinding(vb); - } catch (SQLException e) { - logger.warn("Process Port Binding problem with provenance", e); - } - - } else if (valueType.equals("referenceSet")) { - vb.setIteration(iterationVector); - vb.setValue(valueEl.getAttributeValue("id")); - vb.setReference(valueEl.getChildText("reference")); - - logger.debug("new input VB with workflowId=" + currentWorkflowID - + " processorId=" + processorId + " valueType=" + valueType - + " portName=" + portName + " collIdRef=" + collIdRef - + " position=" + positionInCollection + " itvector=" - + iterationVector + " value=" + vb.getValue()); - - try { - getPw().addPortBinding(vb); - } catch (SQLException e) { - logger.debug("Problem processing var binding -- performing update instead of insert", e); //, e); - // try to update the existing record instead using the current collection info - - getPw().updatePortBinding(vb); - } - - } else if (valueType.equals("list")) { - logger.debug("input of type list"); - - // add entries to the Collection and to the PortBinding tables - // list id --> Collection.collId - - String collId = valueEl.getAttributeValue("id"); - try { - parentCollectionRef = getPw().addCollection(processorId, collId, - parentCollectionRef, iterationVector, portName, - workflowRunId); - - // iterate over each list element - List<Element> listElements = valueEl.getChildren(); - - positionInCollection = 1; // also use this as a suffix to extend the iteration vector - - // extend iteration vector to account for additional levels within the list - - String originalIterationVector = iterationVector; - - // children can be any base type, including list itself -- so - // use recursion - for (Element el : listElements) { - if (originalIterationVector.length() > 2) // vector is not empty - iterationVector = originalIterationVector.substring(0, - originalIterationVector.length()-1) + ","+ - Integer.toString(positionInCollection-1) + "]"; - else - iterationVector = "["+ (positionInCollection-1) + "]"; - - newBindings.addAll(processPortBinding(el, processorId, - portName, collId, positionInCollection, - parentCollectionRef, iterationId, workflowRunId, - iterationVector, currentWorkflowID)); - - positionInCollection++; - } - } catch (SQLException e) { - logger.warn("Problem processing var binding", e); - } - } else if (valueType.equals("error")) { - vb.setIteration(iterationVector); - vb.setValue(valueEl.getAttributeValue("id")); - vb.setReference(valueEl.getChildText("reference")); - try { - getPw().addPortBinding(vb); - } catch (SQLException e) { - logger.debug("Problem processing var binding -- performing update instead of insert", e); //, e); - // try to update the existing record instead using the current collection info - - getPw().updatePortBinding(vb); - } - } else { - logger.warn("unrecognized value type element for " - + processorId + ": " + valueType); - } - - return newBindings; - } - - - /** - * OBSOLETE: returns the iteration vector x,y,z,... from [x,y,z,...] - * <p/> - * now returns the vector itself -- this is still experimental - * - * @param iteration - * @return - */ - @Deprecated - String extractIterationVector(String iteration) { - return iteration; - } - - /** - * silly class to hold pairs of strings. any better way?? - * @author paolo - * - */ - class Pair { - String v1, v2; - - public Pair(String current, String workflowId) { - v1=current; v2=workflowId; - } - - /** - * @return the v1 - */ - public String getV1() { - return v1; - } - - /** - * @param v1 the v1 to set - */ - public void setV1(String v1) { - this.v1 = v1; - } - - /** - * @return the v2 - */ - public String getV2() { - return v2; - } - - /** - * @param v2 the v2 to set - */ - public void setV2(String v2) { - this.v2 = v2; - } - } - - @SuppressWarnings("deprecation") - public List<Pair> toposort(String dataflowName, String workflowRunId) throws SQLException { - -// String workflowId = pq.getworkflowIdForDataflow(dataflowName, workflowRunId); - String workflowId = pq.getWorkflowIdForExternalName(dataflowName); - - // fetch processors along with the count of their predecessors - Map<String, Integer> predecessorsCount = getPq().getPredecessorsCount(workflowRunId); - Map<String, List<String>> successorsOf = new HashMap<String, List<String>>(); -// List<String> procList = pq.getContainedProcessors(dataflowName, workflowRunId); - List<String> procList = pq.getContainedProcessors(dataflowName); - - for (String s:procList) { - List<String> successors = getPq().getSuccProcessors(s, workflowId, workflowRunId); - successorsOf.put(s, successors); - } - - List<Pair> sorted = tsort(procList, dataflowName, predecessorsCount, successorsOf, workflowId, workflowRunId); - - for (int i=0; i< sorted.size(); i++) { - String procName = sorted.get(i).getV1(); - - if (pq.isDataflow(procName) && !procName.equals(dataflowName)) { // handle weirdness: a dataflow is contained within itself.. - // recurse on procName - List<Pair> sortedSublist = toposort(procName, workflowRunId); - - // replace procName with sortedSublist in sorted - sorted.remove(i); - sorted.addAll(i, sortedSublist); - } - } - return sorted; - } - - - - /** - * @param procList - * @param predecessorsCount - * @param successorsOf - * @param workflowRunId - * @return - * @throws SQLException - */ - public List<Pair> tsort(List<String> procList, String dataflowName, - Map<String, Integer> predecessorsCount, - Map<String, List<String>> successorsOf, String workflowId, - String workflowRunId) throws SQLException { - List<Pair> l = new ArrayList<>(); // holds sorted elements - List<String> q = new ArrayList<>(); // temp queue - - // init queue with procList processors that have no predecessors - for (String proc:procList) - if (predecessorsCount.get(proc) == null || predecessorsCount.get(proc) == 0 && - !proc.equals(dataflowName)) - q.add(proc); - - while (!q.isEmpty()) { - String current = q.remove(0); - l.add(new Pair(current, workflowId)); - - List<String> successors = successorsOf.get(current); - - if (successors == null) - continue; - - // reduce the number of predecessors to each of the successors by one - // NB we must traverse an additional datalink through a nested workflow input if the successor is a dataflow!! - for (String succ : successors) { - // decrease edge count for each successor processor - predecessorsCount.put(succ, predecessorsCount.get(succ) - 1); - - if (predecessorsCount.get(succ) == 0 && !succ.equals(dataflowName)) - q.add(succ); - } - } // end loop on q - return l; - } - - @SuppressWarnings("deprecation") - public void propagateANL(String workflowRunId) throws SQLException { - String top = pq.getTopLevelDataflowName(workflowRunId); - - // ////////////////////// - // PHASE I: toposort the processors in the whole graph - // ////////////////////// - List<Pair> sorted = toposort(top, workflowRunId); - - List<String> sortedProcessors = new ArrayList<>(); - - for (Pair p : sorted) - sortedProcessors.add(p.getV1()); - - logger.debug("final sorted list of processors"); - for (Pair p : sorted) - logger.debug(p.getV1() + " in workflowId " + p.getV2()); - - // ////////////////////// - // PHASE II: traverse and set anl on each port - // ////////////////////// - - // // sorted processor names in L at this point - // // process them in order - for (Pair pnameInContext : sorted) { - // // process pname's inputs -- set ANL to be the DNL if not set in prior steps - String pname = pnameInContext.getV1(); - String workflowId = pnameInContext.getV2(); - - List<Port> inputs = getPq().getInputPorts(pname, workflowId); // null -> do not use instance (??) CHECK - - int totalANL = 0; - for (Port iv : inputs) { - - if (! iv.isResolvedDepthSet()) { - iv.setResolvedDepth(iv.getDepth()); - getPw().updatePort(iv); - } - - int delta_nl = iv.getResolvedDepth() - iv.getDepth(); - - // if delta_nl < 0 then Taverna wraps the value into a list --> use dnl(X) in this case - if (delta_nl < 0 ) delta_nl = 0;// CHECK iv.getTypedepth(); - - totalANL += delta_nl; - - // this should take care of the special case of the top level dataflow with inputs that have successors in the graph - // propagate this through all the links from this var -// List<Port> successors = getPq().getSuccVars(pname, iv.getVName(), workflowRunId); - -// for (Port v : successors) { -// v.setresolvedDepth(iv.getresolvedDepth()); -// getPw().updateVar(v); -// } - } - - // process pname's outputs -- set ANL based on the sum formula (see - // paper) - for (Port ov : getPq().getOutputPorts(pname, workflowId)) { - - ov.setResolvedDepth(ov.getDepth() + totalANL); - - logger.debug("anl for "+pname+":"+ov.getPortName()+" = "+(ov.getDepth() + totalANL)); - getPw().updatePort(ov); - - // propagate this through all the links from this var - for (Port v : getPq().getSuccPorts(pname, ov.getPortName(), workflowId)) { - List<Port> toBeProcessed = new ArrayList<>(); - toBeProcessed.add(v); - - if (v.getProcessorId() == null && v.isInputPort()) { // this is the input to a nested workflow -// String tempWorkflowId = pq.getworkflowIdForDataflow(v.getPName(), workflowRunId); - String tempWorkflowId = pq - .getWorkflowIdForExternalName(v - .getProcessorName()); - List<Port> realSuccessors = getPq().getSuccPorts( - v.getProcessorName(), v.getPortName(), - tempWorkflowId); - - toBeProcessed.remove(0); - toBeProcessed.addAll(realSuccessors); - - } else if (v.getProcessorId() == null && !v.isInputPort()) { // this is the output to a nested workflow -// String tempworkflowId = pq.getworkflowIdForDataflow(v.getPName(), workflowRunId); - List<Port> realSuccessors = getPq().getSuccPorts( - v.getProcessorName(), v.getPortName(), null); - - toBeProcessed.remove(0); - toBeProcessed.addAll(realSuccessors); - } - - for (Port v1 : toBeProcessed) { - v1.setResolvedDepth(ov.getResolvedDepth()); - logger.debug("anl for " + v1.getProcessorName() + ":" - + v1.getPortName() + " = " - + ov.getResolvedDepth()); - getPw().updatePort(v1); - } - } - } - } - } - - public void setPw(ProvenanceWriter pw) { - this.pw = pw; - } - - public ProvenanceWriter getPw() { - return pw; - } - - public void setPq(ProvenanceQuery pq) { - this.pq = pq; - } - - public ProvenanceQuery getPq() { - return pq; - } - - public void setWorkflowRunId(String workflowRunId) { - this.workflowRunId = workflowRunId; - } - - public String getWorkflowRunId() { - return workflowRunId; - } - - public void setWfdp(WorkflowDataProcessor wfdp) { - this.wfdp = wfdp; - } - - public WorkflowDataProcessor getWfdp() { - return wfdp; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageQueryResultRecord.java ---------------------------------------------------------------------- diff --git a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageQueryResultRecord.java b/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageQueryResultRecord.java deleted file mode 100644 index 975a83a..0000000 --- a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageQueryResultRecord.java +++ /dev/null @@ -1,243 +0,0 @@ -/******************************************************************************* - * Copyright (C) 2007 The University of Manchester - * - * Modifications to the initial code base are copyright of their - * respective authors, or their employers as appropriate. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * as published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 - ******************************************************************************/ -package net.sf.taverna.t2.provenance.lineageservice; - -/** - * - * This Java bean holds a single provenance record, i.e., the finest element of - * a provenance graph that is stored in the provenance DB. Essentially this - * represents one data element (value) flowing through a port (vname) of a - * processor (pname), in the context of one run (wfInstance) of a workflow - * (wfname). The record may include an <b>iteration</b> vector, used when the - * same processor receives multiple values on the same port, as part of - * iterative processing. When the value belongs to a collection (a nested list), - * the <b>collIdRef</b> field contains a reference to that collection. - * - * @author Paolo Missier - * - */ -public class LineageQueryResultRecord { - private String workflowId; - private String processorName; - private String portName; - private String workflowRunId; - private String iteration; - private String value; // atomic or XML-formatted collection -- this is actually a reference to the value... - private String collIdRef; - private String parentCollIDRef; - private String resolvedValue; - private String type; // one of referenceSet, referenceSetCollection - boolean printResolvedValue; - boolean isInput; - boolean isCollection; - - @Override - public String toString() { - if (isCollection) { - return "COLLECTION: proc " + getProcessorName() + " var " - + getPortName() + " " + " iteration: " + getIteration() - + " value: " + getValue() + " collection id: " - + getCollectionT2Reference() + " parent collection: " - + getParentCollIDRef(); - } else if (printResolvedValue) - return "workflow " + getworkflowId() + " proc " - + getProcessorName() + " var " + getPortName() + " " - + " iteration: " + getIteration() + " value: " + getValue() - + " collection id: " + getCollectionT2Reference() - + " resolvedValue: " + getResolvedValue(); - else - return "workflow " + getworkflowId() + " proc " - + getProcessorName() + " var " + getPortName() + " " - + " iteration: " + getIteration() + " collection id: " - + getCollectionT2Reference() + " value: " + getValue(); - } - - /** - * @return the pname - */ - public String getProcessorName() { - return processorName; - } - /** - * @param pname the pname to set - */ - public void setProcessorName(String pname) { - this.processorName = pname; - } - /** - * @return the vname - */ - public String getPortName() { - return portName; - } - /** - * @param vname the vname to set - */ - public void setPortName(String vname) { - this.portName = vname; - } - /** - * @return the workflowRun - */ - public String getWorkflowRunId() { - return workflowRunId; - } - /** - * @param workflowRun the workflowRun to set - */ - public void setWorkflowRunId(String workflowRun) { - this.workflowRunId = workflowRun; - } - /** - * @return the value - */ - public String getValue() { - return value; - } - /** - * @param value the value to set - */ - public void setValue(String value) { - this.value = value; - } - /** - * @return the type - */ - public String getType() { - return type; - } - /** - * @param type the type to set - */ - public void setType(String type) { - this.type = type; - } - /** - * @return the iteration - */ - public String getIteration() { - return iteration; - } - /** - * @param iteration the iteration to set - */ - public void setIteration(String iteration) { - this.iteration = iteration; - } - /** - * @return the resolvedValue - */ - public String getResolvedValue() { - return resolvedValue; - } - /** - * @param resolvedValue the resolvedValue to set - */ - public void setResolvedValue(String resolvedValue) { - this.resolvedValue = resolvedValue; - } - - - public void setPrintResolvedValue(boolean printResolvedValue) { - this.printResolvedValue = printResolvedValue; - } - - - /** - * @return the isInput - */ - public boolean isInputPort() { - return isInput; - } - - - /** - * @param isInput the isInput to set - */ - public void setIsInputPort(boolean isInput) { - this.isInput = isInput; - } - - - /** - * @return the collIdRef - */ - public String getCollectionT2Reference() { - return collIdRef; - } - - - /** - * @param collIdRef the collIdRef to set - */ - public void setCollectionT2Reference(String collIdRef) { - this.collIdRef = collIdRef; - } - - - /** - * @return the isCollection - */ - public boolean isCollection() { - return isCollection; - } - - - /** - * @param isCollection the isCollection to set - */ - public void setCollection(boolean isCollection) { - this.isCollection = isCollection; - } - - - /** - * @return the parentCollIDRef - */ - public String getParentCollIDRef() { - return parentCollIDRef; - } - - - /** - * @param parentCollIDRef the parentCollIDRef to set - */ - public void setParentCollIDRef(String parentCollIDRef) { - this.parentCollIDRef = parentCollIDRef; - } - - - /** - * @return the workflowId - */ - public String getworkflowId() { - return workflowId; - } - - - /** - * @param workflowId the workflowId to set - */ - public void setWorkflowId(String workflowId) { - this.workflowId = workflowId; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageSQLQuery.java ---------------------------------------------------------------------- diff --git a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageSQLQuery.java b/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageSQLQuery.java deleted file mode 100644 index 58d7c3d..0000000 --- a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageSQLQuery.java +++ /dev/null @@ -1,82 +0,0 @@ -/******************************************************************************* - * Copyright (C) 2007 The University of Manchester - * - * Modifications to the initial code base are copyright of their - * respective authors, or their employers as appropriate. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * as published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 - ******************************************************************************/ -package net.sf.taverna.t2.provenance.lineageservice; - -/** - * encapsulates an SQL query along with directives on how to interpret the - * results, i.e., which elements of the select clause are to be considered - * relevant. For instance when the query includes a join with Collection, the - * intent is that lineage should return the collection itself as opposed to any - * of its elements. - * - * @author paolo - * - */ -public class LineageSQLQuery { - private String vbQuery = null; - private String collQuery = null; - - /** =0 => use var values, >0 => use enclosing collection */ - private int depth = 0; - - /** - * @return the depth - */ - public int getdepth() { - return depth; - } - - /** - * @param depth - * the depth to set - */ - public void setdepth(int depth) { - this.depth = depth; - } - - /** - * @return the collQuery - */ - public String getCollQuery() { - return collQuery; - } - - /** - * @param collQuery the collQuery to set - */ - public void setCollQuery(String collQuery) { - this.collQuery = collQuery; - } - - /** - * @return the vbQuery - */ - public String getVbQuery() { - return vbQuery; - } - - /** - * @param vbQuery the vbQuery to set - */ - public void setVbQuery(String vbQuery) { - this.vbQuery = vbQuery; - } -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/Provenance.java ---------------------------------------------------------------------- diff --git a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/Provenance.java b/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/Provenance.java deleted file mode 100644 index 621e351..0000000 --- a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/Provenance.java +++ /dev/null @@ -1,199 +0,0 @@ -/******************************************************************************* - * Copyright (C) 2007 The University of Manchester - * - * Modifications to the initial code base are copyright of their - * respective authors, or their employers as appropriate. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * as published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 - ******************************************************************************/ -package net.sf.taverna.t2.provenance.lineageservice; - -import java.io.IOException; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import net.sf.taverna.t2.provenance.connector.AbstractProvenanceConnector; -import net.sf.taverna.t2.provenance.item.ProvenanceItem; -import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem; -import net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary; -import net.sf.taverna.t2.workflowmodel.Dataflow; - -import org.apache.log4j.Logger; - -/** - * Implemented by the database class that a {@link AbstractProvenanceConnector} - * implementation uses for storage purposes - * - * @author Paolo Missier - * @author Ian Dunlop - * - */ -//FIXME is this class really needed. Can't we just push the -//acceptRawProvanceEvent up into the ProvenanceConnector? -public class Provenance { - private static Logger logger = Logger.getLogger(Provenance.class); - - protected ProvenanceQuery pq; - protected ProvenanceWriter pw; - protected EventProcessor ep; - - private String saveEvents; - - private volatile boolean firstWorkflowStructure = true; - - public boolean isFirstWorkflowStructure() { - return firstWorkflowStructure; - } - - public void setFirstWorkflowStructure(boolean firstWorkflowStructure) { - this.firstWorkflowStructure = firstWorkflowStructure; - } - - private List<String> workflowIDStack = Collections.synchronizedList(new ArrayList<String>()); - - private Map<String, String> workflowIDMap = new ConcurrentHashMap<String, String>(); - - public Provenance() { } - - public Provenance(EventProcessor eventProcessor) { - this.ep = eventProcessor; - this.pq = ep.getPq(); - this.pw = ep.getPw(); - } - - public void clearDB() throws SQLException { - getPw().clearDBStatic(); - getPw().clearDBDynamic(); - } - - - /** - * @return the saveEvents - */ - public String getSaveEvents() { - return saveEvents; - } - - /** - * @param saveEvents - * the saveEvents to set - */ - public void setSaveEvents(String saveEvents) { - this.saveEvents = saveEvents; - } - - // FIXME I think the provenance query and writer should both come from the - // EventProcessor - // seems silly setting the ep, pq and pw separately. - public void setPq(ProvenanceQuery pq) { - this.pq = pq; - } - - public ProvenanceQuery getPq() { - return pq; - } - - public void setPw(ProvenanceWriter pw) { - this.pw = pw; - } - - public ProvenanceWriter getPw() { - return pw; - } - - public void setEp(EventProcessor ep) { - this.ep = ep; - } - - public EventProcessor getEp() { - return ep; - } - - /** - * maps each incoming event to an insert query into the provenance store - * - * @param eventType - * @param content - * @throws SQLException - * @throws IOException - */ - public void acceptRawProvenanceEvent(SharedVocabulary eventType, - ProvenanceItem provenanceItem) throws SQLException, IOException { - processEvent(provenanceItem, eventType); - } - - /** - * parse d and generate SQL insert calls into the provenance DB - * - * @param d - * DOM for the event - * @param eventType - * see {@link SharedVocabulary} - * @throws SQLException - * @throws IOException - */ - protected void processEvent(ProvenanceItem provenanceItem, - SharedVocabulary eventType) throws SQLException, IOException { - if (eventType.equals(SharedVocabulary.WORKFLOW_EVENT_TYPE)) { - // process the workflow structure - //workflowStartedMap.put() - WorkflowProvenanceItem workflowProvenanceItem = (WorkflowProvenanceItem) provenanceItem; - - getEp().getWfdp().workflowStarted.put(workflowProvenanceItem.getIdentifier(), workflowProvenanceItem.getInvocationStarted()); - if (isFirstWorkflowStructure()) { - String dataflowId = workflowProvenanceItem.getDataflow().getIdentifier(); - String instanceId = provenanceItem.getIdentifier(); - - workflowIDMap.put(instanceId, dataflowId); - setFirstWorkflowStructure(false); - String processWorkflowStructure = getEp().processWorkflowStructure(provenanceItem); - synchronized(workflowIDStack) { - workflowIDStack.add(0,processWorkflowStructure); - } - - getEp().propagateANL(provenanceItem.getIdentifier()); - } else { - String dataflowId = workflowProvenanceItem.getDataflow().getIdentifier(); - String instanceId = provenanceItem.getIdentifier(); - - workflowIDMap.put(instanceId, dataflowId); - - Dataflow df = workflowProvenanceItem.getDataflow(); - synchronized(workflowIDStack) { - workflowIDStack.add(0,df.getIdentifier()); - } - } - } else if (provenanceItem.getEventType().equals(SharedVocabulary.END_WORKFLOW_EVENT_TYPE)) { -// String currentWorkflowID = workflowIDStack.get(0); -// workflowIDStack.remove(0); - String currentWorkflowID = provenanceItem.getParentId(); - - getEp().processProcessEvent(provenanceItem, currentWorkflowID); - - } else { // all other event types (iteration etc.) - logger.debug("processEvent of type "+provenanceItem.getEventType()+" for item of type "+provenanceItem.getClass().getName()); - String currentWorkflowID = provenanceItem.getWorkflowId(); -// String currentWorkflowID = workflowIDMap.get(provenanceItem.getParentId()); - - getEp().processProcessEvent(provenanceItem, currentWorkflowID); - -// getEp().processProcessEvent(provenanceItem, workflowIDStack.get(0)); - } - } -}
