http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/EventProcessor.java
----------------------------------------------------------------------
diff --git 
a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/EventProcessor.java
 
b/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/EventProcessor.java
deleted file mode 100644
index 291da99..0000000
--- 
a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/EventProcessor.java
+++ /dev/null
@@ -1,1547 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- 
******************************************************************************/
-package net.sf.taverna.t2.provenance.lineageservice;
-
-import static java.util.Collections.synchronizedList;
-import static 
net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceProcessor.DATAFLOW_ACTIVITY;
-import static 
net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceUtils.getDataItemAsXML;
-import static 
net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceUtils.iterationToString;
-import static 
net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceUtils.parentProcess;
-import static 
net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.ACTIVITY_EVENT_TYPE;
-import static 
net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.END_WORKFLOW_EVENT_TYPE;
-import static 
net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.INVOCATION_STARTED_EVENT_TYPE;
-import static 
net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.ITERATION_EVENT_TYPE;
-import static 
net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.PROCESSOR_EVENT_TYPE;
-import static 
net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.PROCESS_EVENT_TYPE;
-import static 
net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.WORKFLOW_DATA_EVENT_TYPE;
-
-import java.beans.ExceptionListener;
-import java.beans.XMLEncoder;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.sql.Blob;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.sql.rowset.serial.SerialBlob;
-
-import net.sf.taverna.t2.provenance.item.DataProvenanceItem;
-import net.sf.taverna.t2.provenance.item.DataflowRunComplete;
-import net.sf.taverna.t2.provenance.item.InputDataProvenanceItem;
-import net.sf.taverna.t2.provenance.item.InvocationStartedProvenanceItem;
-import net.sf.taverna.t2.provenance.item.IterationProvenanceItem;
-import net.sf.taverna.t2.provenance.item.OutputDataProvenanceItem;
-import net.sf.taverna.t2.provenance.item.ProvenanceItem;
-import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
-import net.sf.taverna.t2.provenance.lineageservice.utils.DataBinding;
-import net.sf.taverna.t2.provenance.lineageservice.utils.DataLink;
-import net.sf.taverna.t2.provenance.lineageservice.utils.NestedListNode;
-import net.sf.taverna.t2.provenance.lineageservice.utils.Port;
-import net.sf.taverna.t2.provenance.lineageservice.utils.PortBinding;
-import net.sf.taverna.t2.provenance.lineageservice.utils.ProcessorBinding;
-import net.sf.taverna.t2.provenance.lineageservice.utils.ProcessorEnactment;
-import net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceProcessor;
-import net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceUtils;
-import net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.Dataflow;
-import net.sf.taverna.t2.workflowmodel.DataflowInputPort;
-import net.sf.taverna.t2.workflowmodel.DataflowOutputPort;
-import net.sf.taverna.t2.workflowmodel.Datalink;
-import net.sf.taverna.t2.workflowmodel.InputPort;
-import net.sf.taverna.t2.workflowmodel.MergeInputPort;
-import net.sf.taverna.t2.workflowmodel.MergeOutputPort;
-import net.sf.taverna.t2.workflowmodel.OutputPort;
-import net.sf.taverna.t2.workflowmodel.Processor;
-import net.sf.taverna.t2.workflowmodel.ProcessorInputPort;
-import net.sf.taverna.t2.workflowmodel.ProcessorOutputPort;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.NestedDataflow;
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-//import org.apache.commons.io.FileUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.jdom.Document;
-import org.jdom.Element;
-import org.jdom.JDOMException;
-import org.jdom.Namespace;
-import org.jdom.input.SAXBuilder;
-import org.jdom.output.XMLOutputter;
-
-import uk.org.taverna.scufl2.api.io.WorkflowBundleIO;
-
-/**
- * @author Paolo Missier
- */
-public class EventProcessor {
-       /**
-        * A map of UUIDs of the originating processor to the ProcBinding object
-        * that contains its parameters
-        */
-       private Map<String, ProcessorBinding> procBindingMap = new 
ConcurrentHashMap<>();
-
-       /** A map of child ids to their parents in the hierarchy of events:
-        *  workflow -> process -> processor -> activity -> iteration
-        */
-       private Map<String, String> parentChildMap= new ConcurrentHashMap<>();
-
-       private static Logger logger = Logger.getLogger(EventProcessor.class);
-
-       private static final String OUTPUT_CONTAINER_PROCESSOR = "_OUTPUT_";
-       private static final String INPUT_CONTAINER_PROCESSOR = "_INPUT_";
-
-       private volatile boolean workflowStructureDone = false; // used to 
inhibit processing of multiple workflow events -- we only need the first
-       private volatile String workflowRunId = null; // unique run ID. set 
when we see the first event of type "process"
-
-       String topLevelDataflowName = null;
-       String topLevelDataflowID   = null;
-
-       Map<String, String> wfNestingMap = new ConcurrentHashMap<>();
-
-       // all input bindings are accumulated here so they can be "backpatched" 
(see backpatching() )
-       List<PortBinding> allInputVarBindings = synchronizedList(new 
ArrayList<PortBinding>());
-
-       // dedicated class for processing WorkflowData events which carry 
workflow output info
-       private WorkflowDataProcessor  wfdp;
-       private ProvenanceWriter pw = null;
-       private ProvenanceQuery  pq = null;
-
-       private HashMap<String, Port> mapping;
-
-       private Map<String, ProcessorEnactment> processorEnactmentMap = new 
ConcurrentHashMap<>();
-
-       private Map<String, ProvenanceProcessor> processorMapById = new 
ConcurrentHashMap<>();
-
-       private WorkflowBundleIO io;
-
-       // Backpatching temporarily disabled
-       private static final boolean backpatching = false;
-
-       public EventProcessor(WorkflowBundleIO io) {
-               this.io = io;
-       }
-
-       /**
-        * @param pw
-        * @throws SQLException
-        * @throws ClassNotFoundException
-        * @throws IllegalAccessException
-        * @throws InstantiationException
-        *
-        */
-       public EventProcessor(ProvenanceWriter pw, ProvenanceQuery pq,
-                       WorkflowDataProcessor wfdp,WorkflowBundleIO io) throws 
InstantiationException,
-                       IllegalAccessException, ClassNotFoundException, 
SQLException {
-               this.pw = pw;
-               this.pq = pq;
-               this.wfdp = wfdp;
-               this.io = io;
-
-               //logger.setLevel((Level) Level.INFO);
-       }
-
-       /**
-        * this is the new version that makes use of the T2 deserializer
-        * populate static portion of the DB<br/>
-        * the static structure may already be in the DB -- this is detected as 
a duplicate top-level workflow ID.
-        * In this case, we skip this processing altogether
-        * @param content
-        *            is a serialized dataflow (XML) -- this is parsed using 
the T2
-        *            Deserializer
-        * @return the workflowRunId for this workflow structure
-        */
-       public String processWorkflowStructure(ProvenanceItem provenanceItem) {
-               /*
-                * this flag is set to prevent processing of separate
-                * workflowProvenanceItems that describe nested workflows. the
-                * processing of all nested workflows is done as part of the 
very first
-                * workflowProvenanceItem that we receive, which is 
self-consistent. so
-                * we ignore all others
-                */
-               if (workflowStructureDone)
-                       return null;
-               WorkflowProvenanceItem wpi = (WorkflowProvenanceItem) 
provenanceItem;
-               setWorkflowRunId(wpi.getIdentifier());
-               workflowStructureDone = true;
-               return processWorkflowStructure(wpi.getDataflow());
-       }
-
-       public String processWorkflowStructure(Dataflow df) {
-               topLevelDataflowName = df.getLocalName();
-               topLevelDataflowID   = df.getIdentifier();
-
-               // check whether we already have this WF in the DB
-               List<String> workflowIds = null;
-               try {
-                       workflowIds = pq.getAllworkflowIds();
-               } catch (SQLException e) {
-                       logger.warn("Problem processing workflow structure", e);
-               }
-
-               if (workflowIds == null || 
workflowIds.contains(topLevelDataflowID)) {
-                       // not already in the DB
-                       logger.info("new workflow structure with ID " + 
topLevelDataflowID);
-                       ProvenanceProcessor provProc = new 
ProvenanceProcessor();
-                       provProc.setIdentifier(UUID.randomUUID().toString());
-                       provProc.setProcessorName(topLevelDataflowName);
-                       provProc.setFirstActivityClassName(DATAFLOW_ACTIVITY);
-                       provProc.setWorkflowId(topLevelDataflowID);
-                       provProc.setTopLevelProcessor(true);
-                       // record the top level dataflow as a processor in the 
DB
-                       try {
-                               pw.addProcessor(provProc);
-                               // pw.addProcessor(topLevelDataflowName, 
DATAFLOW_PROCESSOR_TYPE, topLevelDataflowID, true);  // true -> is top level
-                       } catch (SQLException e) {
-                               logger.warn("Can't add processor " + 
topLevelDataflowID, e);
-                       }
-               }
-
-               return processDataflowStructure(df, topLevelDataflowID, 
df.getLocalName());  // null: no external name given to top level dataflow
-       }
-
-       private Blob serialize(Dataflow df) {
-               Element serializeDataflow = 
null;xmlSerializer.serializeDataflow(df);//FIXME
-               String dataflowString = null;
-               try {
-                   XMLOutputter outputter = new XMLOutputter();
-                   StringWriter stringWriter = new StringWriter();
-                   outputter.output(serializeDataflow, stringWriter);
-                   dataflowString = stringWriter.toString();
-               } catch (java.io.IOException e) {
-                   logger.error("Could not serialise dataflow", e);
-                   // FIXME Bad Exception handling!
-               }
-               return new SerialBlob(dataflowString.getBytes("UTF-8"));
-       }
-
-       /**
-        * note: this method can be called as part of a recursion on 
sub-workflows
-        * 
-        * @param df
-        * @param dataflowID
-        *            the UUID for the entire dataflow (may be a sub-dataflow)
-        * @param localName
-        *            the external name of the dataflow. Null if this is top 
level,
-        *            not null if a sub-dataflow
-        * @return the workflowRunId for this workflow structure
-        */
-       private String processDataflowStructure(Dataflow df, String dataflowID, 
String externalName) {
-               String localWorkflowRunID = getWorkflowRunId();
-
-               //dataflowDepth++;
-
-               try {
-                       // check whether we already have this WF in the DB
-                       boolean alreadyInDb;
-                       try {
-                               List<String> workflowIds = 
pq.getAllworkflowIds();
-                               alreadyInDb = workflowIds != null && 
workflowIds.contains(dataflowID);
-                       } catch (SQLException e) {
-                               logger.warn("Problem processing dataflow 
structure for " + dataflowID, e);
-                               alreadyInDb = false;
-                       }
-
-                       // add workflow ID -- this is NOT THE SAME AS the 
workflowRunId
-
-                       /*
-                        * this could be a nested workflow -- in this case, 
override its
-                        * workflowRunId with that of its parent
-                        */
-                       if (!alreadyInDb) {
-                               String parentDataflow = 
wfNestingMap.get(dataflowID);
-                               Blob blob = serialize(df);
-                               if (parentDataflow == null) {
-                                       // this is a top level dataflow 
description
-                                       pw.addWFId(dataflowID, null, 
externalName, blob); // set its dataflowID with no parent
-
-                               } else {
-                                       // we are processing a nested workflow 
structure
-                                       logger.debug("dataflow "+dataflowID+" 
with external name "+externalName+" is nested within "+parentDataflow);
-
-                                       pw.addWFId(dataflowID, parentDataflow, 
externalName, blob); // set its dataflowID along with its parent
-
-                                       // override workflowRunId to point to 
top level -- UNCOMMENTED PM 9/09  CHECK
-                                       localWorkflowRunID = 
pq.getRuns(parentDataflow, null).get(0).getWorkflowRunId();
-                               }
-                       }
-                       // Log the run itself
-                       pw.addWorkflowRun(dataflowID, localWorkflowRunID);
-
-                       // add processors along with their variables
-                       List<Port> vars = new ArrayList<Port>();
-                       for (Processor p : df.getProcessors()) {
-                               String pName = p.getLocalName();
-
-                               //CHECK get type of first activity and set this 
as the type of the processor itself
-                               List<? extends Activity<?>> activities = 
p.getActivityList();
-
-                               if (! alreadyInDb) {
-                                       ProvenanceProcessor provProc;
-                                       String pType = null;
-                                       if (activities != null && 
!activities.isEmpty())
-                                               pType = 
activities.get(0).getClass().getCanonicalName();
-                                       provProc = new ProvenanceProcessor();
-                                       
provProc.setIdentifier(UUID.randomUUID().toString());
-                                       provProc.setProcessorName(pName);
-                                       
provProc.setFirstActivityClassName(pType);
-                                       provProc.setWorkflowId(dataflowID);
-                                       provProc.setTopLevelProcessor(false);
-
-                                       pw.addProcessor(provProc);
-
-                                       //pw.addProcessor(pName, pType, 
dataflowID, false);  // false: not a top level processor
-
-                                       /*
-                                        * add all input ports for this 
processor as input variables
-                                        */
-                                       for (ProcessorInputPort ip : 
p.getInputPorts()) {
-                                               Port inputVar = new Port();
-                                               
inputVar.setIdentifier(UUID.randomUUID().toString());
-                                               
inputVar.setProcessorId(provProc.getIdentifier());
-                                               
inputVar.setProcessorName(pName);
-                                               
inputVar.setWorkflowId(dataflowID);
-                                               
inputVar.setPortName(ip.getName());
-                                               
inputVar.setDepth(ip.getDepth());
-                                               inputVar.setInputPort(true);
-                                               vars.add(inputVar);
-                                       }
-
-                                       /*
-                                        * add all output ports for this 
processor as output
-                                        * variables
-                                        */
-                                       for (ProcessorOutputPort op : 
p.getOutputPorts()) {
-                                               Port outputVar = new Port();
-                                               
outputVar.setIdentifier(UUID.randomUUID().toString());
-                                               
outputVar.setProcessorName(pName);
-                                               
outputVar.setProcessorId(provProc.getIdentifier());
-                                               
outputVar.setWorkflowId(dataflowID);
-                                               
outputVar.setPortName(op.getName());
-                                               
outputVar.setDepth(op.getDepth());
-                                               outputVar.setInputPort(false);
-
-                                               vars.add(outputVar);
-                                       }
-                               }
-
-                               /*
-                                * check for nested structures: if the activity 
is
-                                * DataflowActivity then this processor is a 
nested workflow;
-                                * make an entry into wfNesting map with its ID 
and recurse on
-                                * the nested workflow
-                                */
-
-                               if (activities != null)
-                                       for (Activity<?> a : activities) {
-                                               if (!(a instanceof 
NestedDataflow))
-                                                       continue;
-
-                                               Dataflow nested = 
((NestedDataflow) a)
-                                                               
.getNestedDataflow();
-                                               
wfNestingMap.put(nested.getIdentifier(), dataflowID); // child -> parent
-
-                                               // RECURSIVE CALL
-                                               processDataflowStructure(nested,
-                                                               
nested.getIdentifier(), p.getLocalName());
-                                       }
-                       } // end for each processor
-
-                       // add inputs to entire dataflow
-                       String pName = INPUT_CONTAINER_PROCESSOR;  // 
overridden -- see below
-
-                       /*
-                        * check whether we are processing a nested workflow. 
in this case
-                        * the input vars are not assigned to the INPUT 
processor but to the
-                        * containing dataflow
-                        */
-                       if (! alreadyInDb) {
-                               if (externalName != null) // override the 
default if we are nested or someone external name is provided
-                                       pName = externalName;
-
-                               for (DataflowInputPort ip : df.getInputPorts()) 
{
-                                       Port inputVar = new Port();
-                                       
inputVar.setIdentifier(UUID.randomUUID().toString());
-                                       inputVar.setProcessorId(null); // 
meaning workflow port
-                                       inputVar.setProcessorName(pName);
-                                       inputVar.setWorkflowId(dataflowID);
-                                       inputVar.setPortName(ip.getName());
-                                       inputVar.setDepth(ip.getDepth());
-                                       inputVar.setInputPort(true);  // CHECK 
PM modified 11/08 -- input vars are actually outputs of input processors...
-
-                                       vars.add(inputVar);
-                               }
-
-                               // add outputs of entire dataflow
-                               pName = OUTPUT_CONTAINER_PROCESSOR;  // 
overridden -- see below
-
-                               /*
-                                * check whether we are processing a nested 
workflow. in this
-                                * case the output vars are not assigned to the 
OUTPUT processor
-                                * but to the containing dataflow
-                                */
-                               if (externalName != null) // we are nested
-                                       pName = externalName;
-
-                               for (DataflowOutputPort op : 
df.getOutputPorts()) {
-                                       Port outputVar = new Port();
-                                       
outputVar.setIdentifier(UUID.randomUUID().toString());
-                                       outputVar.setProcessorId(null); // 
meaning workflow port
-                                       outputVar.setProcessorName(pName);
-                                       outputVar.setWorkflowId(dataflowID);
-                                       outputVar.setPortName(op.getName());
-                                       outputVar.setDepth(op.getDepth());
-                                       outputVar.setInputPort(false);  // 
CHECK PM modified 11/08 -- output vars are actually outputs of output 
processors...
-                                       vars.add(outputVar);
-                               }
-
-                               pw.addPorts(vars, dataflowID);
-                               makePortMapping(vars);
-
-                               /*
-                                * add datalink records using the dataflow 
links retrieving the
-                                * processor names requires navigating from 
links to source/sink
-                                * and from there to the processors
-                                */
-                               for (Datalink l : df.getLinks()) {
-                                       // TODO cover the case of datalinks 
from an input and to an output to the entire dataflow
-
-                                       Port sourcePort = null;
-                                       Port destinationPort = null;
-
-                                       OutputPort source = l.getSource();
-                                       if (source instanceof 
ProcessorOutputPort) {
-                                               String sourcePname = 
((ProcessorOutputPort) source)
-                                                               
.getProcessor().getLocalName();
-                                               sourcePort = 
lookupPort(sourcePname, source.getName(), false);
-                                       } else if (source instanceof 
MergeOutputPort) {
-                                               // TODO: Handle merge output 
ports
-                                       } else
-                                               // Assume it is internal port 
from DataflowInputPort
-                                               sourcePort = 
lookupPort(externalName, source.getName(), true);
-
-                                       InputPort sink = l.getSink();
-                                       if (sink instanceof ProcessorInputPort) 
{
-                                               String sinkPname = 
((ProcessorInputPort) sink)
-                                                               
.getProcessor().getLocalName();
-                                               destinationPort = 
lookupPort(sinkPname, sink.getName(), true);
-                                       } else if (sink instanceof 
MergeInputPort) {
-                                               // TODO: Handle merge input 
ports
-                                       } else
-                                               // Assume it is internal port 
from DataflowOutputPort
-                                               destinationPort = 
lookupPort(externalName, sink.getName(), false);
-
-                                       if (sourcePort != null && 
destinationPort != null)
-                                               pw.addDataLink(sourcePort, 
destinationPort, dataflowID);
-                                       else
-                                               logger.info("Can't record 
datalink " + l);
-                               }
-                       }
-               } catch (Exception e) {
-                       logger.error("Problem processing provenance for 
dataflow", e);
-               }
-
-               return dataflowID;
-       }
-
-       private void makePortMapping(List<Port> ports) {
-               mapping = new HashMap<>();
-               for (Port port: ports) {
-                       String key = port.getProcessorName()
-                                       + (port.isInputPort() ? "/i:" : "/o:") 
+ port.getPortName();
-                       mapping.put(key, port);
-               }
-       }
-
-       private Port lookupPort(String processorName, String portName, boolean 
isInputPort) {
-               String key = processorName + (isInputPort ? "/i:" : "/o:") + 
portName;
-               return mapping.get(key);
-       }
-
-       /**
-        * processes an elementary process execution event from T2. Collects 
info
-        * from events as they happen and sends them to the writer for 
processing
-        * when the iteration event is received. Uses the map of procBindings to
-        * process event id and the map of child ids to parent ids to ensure 
that
-        * the correct proc binding is used
-        * @param currentWorkflowID
-        *
-        * @param d
-        * @param context
-        */
-       public void processProcessEvent(ProvenanceItem provenanceItem, String 
currentWorkflowID) {
-               switch (provenanceItem.getEventType()) {
-               case PROCESS_EVENT_TYPE: {
-                       String parentId = provenanceItem.getParentId();  // 
this is the workflowID
-                       String identifier = provenanceItem.getIdentifier();  // 
use this as workflowRunId if this is the top-level process
-                       
-                       parentChildMap.put(identifier, parentId);
-                       ProcessorBinding pb = new ProcessorBinding();
-                       pb.setWorkflowRunId(getWorkflowRunId());
-                       pb.setWorkflowId(currentWorkflowID);
-                       procBindingMap.put(identifier, pb);
-                       return;
-               }
-               case PROCESSOR_EVENT_TYPE: {
-                       String identifier = provenanceItem.getIdentifier();
-                       String parentId = provenanceItem.getParentId();
-                       String processID = provenanceItem.getProcessId(); // 
this is the external process ID
-
-                       // this has the weird form facade0:dataflowname:pname  
need to extract pname from here
-                       String[] processName = processID.split(":");
-                       procBindingMap.get(parentId).setProcessorName(
-                                       processName[processName.length - 1]);
-                       // 3rd component of composite name
-
-                       parentChildMap.put(identifier, parentId);
-                       return;
-               }
-               case ACTIVITY_EVENT_TYPE: {
-                       String identifier = provenanceItem.getIdentifier();
-                       String parentId = provenanceItem.getParentId();
-                       procBindingMap.get(parentChildMap.get(parentId))
-                                       .setFirstActivityClassName(identifier);
-                       parentChildMap.put(identifier, parentId);
-                       return;
-               }
-               case ITERATION_EVENT_TYPE: {
-                       IterationProvenanceItem iterationProvenanceItem = 
(IterationProvenanceItem)provenanceItem;
-                       if (iterationProvenanceItem.getParentIterationItem() != 
null)
-                               // Skipping pipelined outputs, we'll process 
the parent output later instead
-                               return;
-
-                       // traverse up to root to retrieve ProcBinding that was 
created when we saw the process event
-                       String activityID = provenanceItem.getParentId();
-                       String processorID = parentChildMap.get(activityID);
-                       String processID = parentChildMap.get(processorID);
-                       String iterationID = provenanceItem.getIdentifier();
-                       parentChildMap.put(iterationID, activityID);
-
-                       ProcessorEnactment processorEnactment = 
processorEnactmentMap
-                                       .get(iterationID);
-                       if (processorEnactment == null)
-                               processorEnactment = new ProcessorEnactment();
-
-                       ProcessorBinding procBinding = 
procBindingMap.get(processID);
-
-                       String itVector = 
extractIterationVector(iterationToString(iterationProvenanceItem
-                                       .getIteration()));
-                       procBinding.setIterationVector(itVector);
-
-                       
processorEnactment.setEnactmentStarted(iterationProvenanceItem
-                                       .getEnactmentStarted());
-                       
processorEnactment.setEnactmentEnded(iterationProvenanceItem
-                                       .getEnactmentEnded());
-                       processorEnactment.setWorkflowRunId(workflowRunId);
-                       processorEnactment.setIteration(itVector);
-
-                       String processId = 
iterationProvenanceItem.getProcessId();
-                       String parentProcessId = parentProcess(processId, 3);
-                       if (parentProcessId != null) {
-                               ProcessorEnactment parentProcEnact = 
getWfdp().invocationProcessToProcessEnactment
-                                               .get(parentProcessId);
-                               if (parentProcEnact != null)
-                                       processorEnactment
-                                                       
.setParentProcessorEnactmentId(parentProcEnact
-                                                                       
.getProcessEnactmentId());
-                       }
-                       
processorEnactment.setProcessEnactmentId(iterationProvenanceItem
-                                       .getIdentifier());
-                       processorEnactment.setProcessIdentifier(processId);
-
-                       ProvenanceProcessor provenanceProcessor;
-                       if (processorEnactment.getProcessorId() == null) {
-                               provenanceProcessor = 
pq.getProvenanceProcessorByName(
-                                               currentWorkflowID, 
procBinding.getProcessorName());
-                               if (provenanceProcessor == null)
-                                       // already logged warning
-                                       return;
-                               
processorMapById.put(provenanceProcessor.getIdentifier(),
-                                               provenanceProcessor);
-                               
processorEnactment.setProcessorId(provenanceProcessor
-                                               .getIdentifier());
-                       } else {
-                               provenanceProcessor = 
processorMapById.get(processorEnactment
-                                               .getProcessorId());
-                               if (provenanceProcessor == null) {
-                                       provenanceProcessor = pq
-                                                       
.getProvenanceProcessorById(processorEnactment
-                                                                       
.getProcessorId());
-                                       
processorMapById.put(provenanceProcessor.getIdentifier(),
-                                                       provenanceProcessor);
-                               }
-                       }
-
-                       InputDataProvenanceItem inputDataEl = 
iterationProvenanceItem.getInputDataItem();
-                       OutputDataProvenanceItem outputDataEl = 
iterationProvenanceItem.getOutputDataItem();
-
-                       if (inputDataEl != null
-                                       && 
processorEnactment.getInitialInputsDataBindingId() == null) {
-                               processorEnactment
-                                               
.setInitialInputsDataBindingId(processDataBindings(
-                                                               inputDataEl, 
provenanceProcessor));
-                               processInput(inputDataEl, procBinding, 
currentWorkflowID);
-                       }
-
-                       if (outputDataEl != null
-                                       && 
processorEnactment.getFinalOutputsDataBindingId() == null) {
-                               processorEnactment
-                                               
.setFinalOutputsDataBindingId(processDataBindings(
-                                                               outputDataEl, 
provenanceProcessor));
-                               processOutput(outputDataEl, procBinding, 
currentWorkflowID);
-                       }
-
-                       try {
-                               if 
(processorEnactmentMap.containsKey(iterationID)) {
-                                       
getPw().updateProcessorEnactment(processorEnactment);
-                               } else {
-                                       
getPw().addProcessorEnactment(processorEnactment);
-                                       processorEnactmentMap.put(iterationID, 
processorEnactment);
-                               }
-                       } catch (SQLException e) {
-                               logger.warn("Could not store processor 
enactment", e);
-                       }
-                       return;
-               }
-               case END_WORKFLOW_EVENT_TYPE: {
-                       DataflowRunComplete completeEvent = 
(DataflowRunComplete) provenanceItem;
-                       // use this event to do housekeeping on the 
input/output varbindings
-
-                       // process the input and output values accumulated by 
WorkflowDataProcessor
-                       getWfdp().processTrees(completeEvent, 
getWorkflowRunId());
-
-                       reconcileLocalOutputs(provenanceItem.getWorkflowId());
-
-                       if (! provenanceItem.getProcessId().contains(":")) {
-                               // Top-level workflow finished
-                               // No longer needed, done by processTrees()
-//                             patchTopLevelnputs();
-
-                               workflowStructureDone = false; // CHECK reset 
for next run...
-//                             reconcileTopLevelOutputs(); // Done by 
reconcileLocalOutputs
-                               getPw().closeCurrentModel();  // only real impl 
is for RDF
-                       }
-                       return;
-               }
-               case WORKFLOW_DATA_EVENT_TYPE: {
-                       // give this event to a WorkflowDataProcessor object 
for pre-processing
-                       //                      try {
-                       // TODO may generate an exception when the data is an 
error CHECK
-                       getWfdp().addWorkflowDataItem(provenanceItem);
-                       //                      } catch (NumberFormatException 
e) {
-                       //                      logger.error(e);
-                       //                      }
-                       //                      logger.info("Received workflow 
data - not processing");
-                       //FIXME not sure  - needs to be stored somehow
-                       return;
-               }
-               case INVOCATION_STARTED_EVENT_TYPE: {
-                       InvocationStartedProvenanceItem startedItem = 
(InvocationStartedProvenanceItem) provenanceItem;
-                       ProcessorEnactment processorEnactment = 
processorEnactmentMap
-                                       .get(startedItem.getParentId());
-                       if (processorEnactment == null) {
-                               logger.error("Could not find ProcessorEnactment 
for invocation "
-                                               + startedItem);
-                               return;
-                       }
-                       getWfdp().invocationProcessToProcessEnactment.put(
-                                       startedItem.getInvocationProcessId(), 
processorEnactment);
-                       return;
-               }
-               case ERROR_EVENT_TYPE:
-                       //TODO process the error
-                       return;
-               default:
-                       // TODO broken, should we throw something here?
-                       return;
-               }
-       }
-
-       private String processDataBindings(
-                       DataProvenanceItem provenanceItem, ProvenanceProcessor 
provenanceProcessor) {
-               // TODO: Cache known provenaneItems and avoid registering again
-               String dataBindingId = UUID.randomUUID().toString();
-               boolean isInput = provenanceItem instanceof 
InputDataProvenanceItem;
-
-               for (Entry<String, T2Reference> entry : 
provenanceItem.getDataMap().entrySet()) {
-                       DataBinding dataBinding = new DataBinding();
-                       dataBinding.setDataBindingId(dataBindingId);
-                       Port port = findPort(provenanceProcessor, 
entry.getKey(), isInput); // findPort
-                       if (port == null) {
-                               logger.warn("Could not find port for " + 
entry.getKey());
-                               continue;
-                       }
-                       dataBinding.setPort(port);
-                       
dataBinding.setT2Reference(entry.getValue().toUri().toASCIIString());
-                       dataBinding.setWorkflowRunId(workflowRunId);
-                       try {
-                               getPw().addDataBinding(dataBinding);
-                       } catch (SQLException e) {
-                               logger.warn("Could not register data binding 
for " + port, e);
-                       }
-               }
-               return dataBindingId;
-       }
-
-       private Port findPort(ProvenanceProcessor provenanceProcessor,
-                       String portName, boolean isInput) {
-               // TODO: Query pr dataflow and cache
-               Map<String, String> queryConstraints = new HashMap<>();
-               queryConstraints.put("V.workflowId",
-                               provenanceProcessor.getWorkflowId());
-               String processorName = provenanceProcessor.getProcessorName();
-               queryConstraints.put("processorName", processorName);
-               queryConstraints.put("portName", portName);
-               queryConstraints.put("isInputPort", isInput ? "1" : "0");
-               try {
-                       List<Port> vars = pq.getPorts(queryConstraints);
-                       if (vars.isEmpty()) {
-                               logger.warn("Can't find port " + portName + " 
in "
-                                               + processorName);
-                       } else if (vars.size() > 1) {
-                               logger.warn("Multiple matches for port " + 
portName + " in "
-                                               + processorName + ", got:" + 
vars);
-                       } else
-                               return vars.get(0);
-               } catch (SQLException e) {
-                       logger.error(
-                                       "Problem getting ports for processor: " 
+ processorName
-                                                       + " worflow: "
-                                                       + 
provenanceProcessor.getWorkflowId(), e);
-               }
-               return null;
-       }
-
-
-       /**
-        * fills in the VBs for the global inputs -- this removes the need for 
explicit events
-        * that account for these value bindings...
-        */
-       public void patchTopLevelnputs() {
-
-               // for each input I to topLevelDataflow:
-               // pick first outgoing datalink with sink P:X
-               // copy value X to I -- this can be a collection, so copy 
everything
-
-               // get all global input vars
-
-               //              logger.info("\n\n BACKPATCHING GLOBAL INPUTS 
with dataflowDepth = "+dataflowDepth+"*******\n");
-
-               List<Port> inputs=null;
-               try {
-                       inputs = getPq().getInputPorts(topLevelDataflowName, 
topLevelDataflowID);
-
-                       for (Port input:inputs)  {
-
-                               //                              
logger.info("global input: "+input.getVName());
-
-                               Map<String,String> queryConstraints = new 
HashMap<String,String>();
-
-//                             queryConstraints.put("sourcePortName", 
input.getVName());
-//                             queryConstraints.put("sourceProcessorName", 
input.getPName());
-                               queryConstraints.put("sourcePortId", 
input.getIdentifier());
-                               queryConstraints.put("workflowId", 
input.getWorkflowId());
-                               List<DataLink> outgoingDataLinks = 
getPq().getDataLinks(queryConstraints);
-
-                               // any datalink will do, use the first
-                               String targetPname = 
outgoingDataLinks.get(0).getDestinationProcessorName();
-                               String targetVname = 
outgoingDataLinks.get(0).getDestinationPortName();
-
-//                             logger.info("copying values from 
["+targetPname+":"+targetVname+"] for instance ID: ["+workflowRunId+"]");
-
-                               queryConstraints.clear();
-                               queryConstraints.put("V.portName", targetVname);
-                               queryConstraints.put("V.processorName", 
targetPname);
-                               queryConstraints.put("VB.workflowRunId", 
getWorkflowRunId());
-                               queryConstraints.put("V.workflowId", 
topLevelDataflowID);
-
-                               for (PortBinding vb : 
getPq().getPortBindings(queryConstraints)) {
-                                       PortBinding inputPortBinding = new 
PortBinding(vb);
-
-                                       // insert PortBinding back into VB with 
the global input portName
-                                       
inputPortBinding.setProcessorName(input.getProcessorName());
-                                       
inputPortBinding.setPortName(input.getPortName());
-                                       try {
-                                               
getPw().addPortBinding(inputPortBinding);
-                                       } catch (SQLException ex) {
-                                               logger.info("Already logged 
port binding", ex);
-                                       }
-                               }
-                       }
-               } catch (SQLException e) {
-                       logger.warn("Patch top level inputs problem for 
provenance", e);
-               } catch (IndexOutOfBoundsException e) {
-                       logger.error("Could not patch top level", e);
-               }
-       }
-
-       public void reconcileTopLevelOutputs() {
-               reconcileLocalOutputs(topLevelDataflowID);
-       }
-
-       // PM added 23/4/09
-       /**
-        * reconcile the top level outputs with the results from its immediate 
precedessors in the graph.<br/>
-        * various cases have to be considered: predecessors may include 
records that are not in the output,
-        * while the output may include nested list structures that are not in 
the precedessors. This method accounts
-        * for a 2-way reconciliation that considers all possible cases.<br/>
-        * at the end, outputs and their predecessors contain the same data.<p/>
-        * NOTE: if we assume that data values (URIs) are <em>always</em> 
unique then this is greatly simplified by just
-        * comparing two sets of value records by their URIs and reconciling 
them. But this is not the way it is done here
-        */
-       public void reconcileLocalOutputs(String dataflowID) {
-               /*
-       for each output O
-
-       for each variable V in predecessors(O)
-
-       fetch all VB records for O into list OValues
-       fetch all VB records for V  into list Yalues
-
-       compare OValues and VValues:
-       it SHOULD be the case that OValues is a subset of YValues. Under this 
assumption:
-
-       for each vb in YValues:
-       - if there is a matching o in OValues then (vb may be missing 
collection information)
-           copy o to vb
-         else
-           if vb has no collection info && there is a matching tree node tn  
in OTree (use iteration index for the match) then
-              set vb to be in collection tb
-              copy vb to o
-
-     finally copy all Collection records for O in OTree -- catch duplicate 
errors
-                */
-
-               Map<String, String> queryConstraints = new HashMap<>();
-
-               try {
-                       // for each output O
-                       for (Port 
output:pq.getOutputPorts(topLevelDataflowName, topLevelDataflowID))  {
-                               // collect all VBs for O
-//                             String oPName = output.getPName();
-//                             String oVName = output.getVName();
-//                             queryConstraints.put("V.portName", oVName);
-//                             queryConstraints.put("V.processorName", oPName);
-//                             queryConstraints.put("VB.workflowRunId", 
workflowRunId);
-//                             queryConstraints.put("V.workflowId", 
topLevelDataflowID);
-
-//                             List<PortBinding> OValues = 
pq.getPortBindings(queryConstraints);
-
-                               // find all records for the immediate 
precedessor Y of O
-                               queryConstraints.clear();
-//                             queryConstraints.put("destinationPortName", 
output.getVName());
-//                             
queryConstraints.put("destinationProcessorName", output.getPName());
-                               queryConstraints.put("destinationPortId", 
output.getIdentifier());
-                               queryConstraints.put("workflowId", 
output.getWorkflowId());
-                               List<DataLink> incomingDataLinks = 
pq.getDataLinks(queryConstraints);
-
-                               // there can be only one -- but check that 
there is one!
-                               if (incomingDataLinks.isEmpty())
-                                       continue;
-
-                               String sourcePname = 
incomingDataLinks.get(0).getSourceProcessorName();
-                               String sourceVname = 
incomingDataLinks.get(0).getSourcePortName();
-
-                               queryConstraints.clear();
-                               queryConstraints.put("V.portName", sourceVname);
-                               queryConstraints.put("V.processorName", 
sourcePname);
-                               queryConstraints.put("VB.workflowRunId", 
getWorkflowRunId());
-                               queryConstraints.put("V.workflowId", 
topLevelDataflowID);
-
-                               List<PortBinding> YValues = 
pq.getPortBindings(queryConstraints);
-
-                               // for each YValue look for a match in OValues
-                               // (assume the YValues values are a superset of 
OValues)!)
-
-                               for (PortBinding yValue:YValues) {
-                                       // look for a matching record in 
PortBinding for output O
-                                       queryConstraints.clear();
-                                       queryConstraints.put("V.portName", 
output.getPortName());
-                                       queryConstraints.put("V.processorName", 
output.getProcessorName());
-                                       
queryConstraints.put("VB.workflowRunId", getWorkflowRunId());
-                                       queryConstraints.put("V.workflowid", 
topLevelDataflowID);
-                                       queryConstraints.put("VB.iteration", 
yValue.getIteration());
-                                       if (yValue.getCollIDRef()!= null) {
-                                               
queryConstraints.put("VB.collIDRef", yValue.getCollIDRef());
-                                               
queryConstraints.put("VB.positionInColl", 
Integer.toString(yValue.getPositionInColl()));
-                                       }
-                                       List<PortBinding> matchingOValues = 
pq.getPortBindings(queryConstraints);
-
-                                       // result at most size 1
-                                       if (!matchingOValues.isEmpty()) {
-                                               PortBinding oValue = 
matchingOValues.get(0);
-
-                                               // copy collection info from 
oValue to yValue
-                                               
yValue.setCollIDRef(oValue.getCollIDRef());
-                                               
yValue.setPositionInColl(oValue.getPositionInColl());
-
-                                               pw.updatePortBinding(yValue);
-                                       } else {
-                                               // copy the yValue to O
-                                               // insert PortBinding back into 
VB with the global output portName
-                                               
yValue.setProcessorName(output.getProcessorName());
-                                               
yValue.setPortName(output.getPortName());
-                                               pw.addPortBinding(yValue);
-                                       }
-
-                               } // for each yValue in YValues
-
-                               // copy all Collection records for O to Y
-
-                               // get all collections refs for O
-                               queryConstraints.clear();
-                               queryConstraints.put("workflowRunId", 
getWorkflowRunId());
-                               queryConstraints.put("processorNameRef", 
output.getProcessorName());
-                               queryConstraints.put("portName", 
output.getPortName());
-
-                               List<NestedListNode> oCollections = 
pq.getNestedListNodes(queryConstraints);
-
-                               // insert back as collection refs for Y -- 
catch duplicates
-                               for (NestedListNode nln:oCollections) {
-                                       nln.setProcessorName(sourcePname);
-                                       nln.setProcessorName(sourceVname);
-
-                                       getPw().replaceCollectionRecord(nln, 
sourcePname, sourceVname);
-                               }
-
-                       } // for each output var
-
-               } catch (SQLException e) {
-                       logger.warn("Problem reconciling top level outputs", e);
-               }
-
-       }
-
-       @SuppressWarnings("unchecked")
-       private void processOutput(OutputDataProvenanceItem provenanceItem,
-                       ProcessorBinding procBinding, String currentWorkflowID) 
{
-               Element dataItemAsXML = getDataItemAsXML(provenanceItem);
-               List<Element> outputPorts = dataItemAsXML.getChildren("port");
-               for (Element outputport : outputPorts) {
-                       String portName = outputport.getAttributeValue("name");
-
-                       // value type may vary
-                       List<Element> valueElements = outputport.getChildren();
-                       if (valueElements != null && !valueElements.isEmpty()) {
-                               Element valueEl = valueElements.get(0); // only 
really 1 child
-
-                               processPortBinding(valueEl, 
procBinding.getProcessorName(),
-                                               portName, 
procBinding.getIterationVector(),
-                                               getWorkflowRunId(), 
currentWorkflowID);
-                       }
-               }
-       }
-
-       /**
-        * this method reconciles values in varBindings across an datalink: 
Firstly,
-        * if vb's value is within a collection, _and_ it is copied from a value
-        * generated during a previous iteration, then this method propagates 
the
-        * list reference to that iteration value, which wouldn't have it.
-        * Conversely, if vb is going to be input to an iteration, then it's 
lost
-        * its containing list node, and we put it back in by looking at the
-        * corresponding predecessor
-        * 
-        * @param vb
-        * @throws SQLException
-        */
-       private void backpatchIterationResults(List<PortBinding> newBindings) 
throws SQLException {
-               logger.debug("backpatchIterationResults: start");
-               for (PortBinding vb : newBindings) {
-                       logger.debug("backpatchIterationResults: processing vb "
-                                       + vb.getProcessorName() + "/" + 
vb.getPortName() + "="
-                                       + vb.getValue());
-
-                       if (vb.getCollIDRef()!= null) // this is a member of a 
collection
-                               logger.debug("...which is inside a collection 
");
-
-                       // look for its antecedent
-                       Map<String,String> queryConstraints = new HashMap<>();
-                       queryConstraints.put("destinationPortName", 
vb.getPortName());
-                       queryConstraints.put("destinationProcessorName", 
vb.getProcessorName());
-                       queryConstraints.put("workflowId", 
pq.getWorkflowIdsForRun(vb.getWorkflowRunId()).get(0));  // CHECK picking first 
element in list...
-                       List<DataLink> incomingDataLinks = 
pq.getDataLinks(queryConstraints);
-
-                       // there can be only one -- but check that there is one!
-                       if (incomingDataLinks.isEmpty())
-                               return;
-
-                       String sourcePname = 
incomingDataLinks.get(0).getSourceProcessorName();
-                       String sourceVname = 
incomingDataLinks.get(0).getSourcePortName();
-
-                       logger.debug("antecedent: 
"+sourcePname+":"+sourceVname);
-
-                       // get the varbindings for this port and select the one 
with the same iteration vector as its successor
-                       queryConstraints.clear();
-                       queryConstraints.put("VB.portName", sourceVname);
-                       queryConstraints.put("V.processorName", sourcePname);
-                       queryConstraints.put("VB.value", vb.getValue());
-                       queryConstraints.put("VB.workflowRunId", 
vb.getWorkflowRunId());
-
-                       // reconcile
-                       for (PortBinding b : 
pq.getPortBindings(queryConstraints)) {
-                               logger.debug("backpatching " + sourceVname + " 
" + sourcePname);
-
-                               if (vb.getCollIDRef() != null && 
b.getCollIDRef() == null) {
-                                       logger.debug("successor " + 
vb.getPortName()
-                                                       + " is in collection " 
+ vb.getCollIDRef()
-                                                       + " but pred " + 
b.getPortName() + " is not");
-                                       logger.debug("putting " + 
b.getPortName()
-                                                       + " in collection " + 
vb.getCollIDRef()
-                                                       + " at pos " + 
vb.getPositionInColl());
-                                       b.setCollIDRef(vb.getCollIDRef());
-                                       
b.setPositionInColl(vb.getPositionInColl());
-                                       getPw().updatePortBinding(b);
-
-                               } else if (vb.getCollIDRef() == null && 
b.getCollIDRef() != null) {
-                                       logger.debug("successor " + 
vb.getPortName()
-                                                       + " is NOT in 
collection but pred "
-                                                       + b.getPortName() + " 
IS");
-                                       logger.debug("putting " + 
vb.getPortName()
-                                                       + " in collection " + 
b.getCollIDRef() + " at pos "
-                                                       + 
b.getPositionInColl());
-                                       vb.setCollIDRef(b.getCollIDRef());
-                                       
vb.setPositionInColl(b.getPositionInColl());
-                                       getPw().updatePortBinding(vb);
-                               }
-                       }
-               }
-       }
-
-
-       /**
-        * create one new PortBinding record for each input port binding
-        * @param currentWorkflowID
-        */
-       @SuppressWarnings("unchecked")
-       private void processInput(InputDataProvenanceItem provenanceItem,
-                       ProcessorBinding procBinding, String currentWorkflowID) 
{
-               Element dataItemAsXML = getDataItemAsXML(provenanceItem);
-               int order = 0;
-               for (Element inputport : (List<Element>) 
dataItemAsXML.getChildren("port")) {
-                       String portName = inputport.getAttributeValue("name");
-
-                       try {
-                               // add process order sequence to Port for this 
portName
-                               Map<String, String> queryConstraints = new 
HashMap<>();
-                               queryConstraints.put("V.workflowId", 
currentWorkflowID);
-                               queryConstraints.put("processorName", 
procBinding.getProcessorName());
-                               queryConstraints.put("portName", portName);
-                               queryConstraints.put("isInputPort", "1");
-
-                               Port v = 
getPq().getPorts(queryConstraints).get(0);
-                               v.setIterationStrategyOrder(order++);
-                               getPw().updatePort(v);
-                       } catch (IndexOutOfBoundsException e) {
-                               logger.error("Could not process input " + 
portName, e);
-                       } catch (SQLException e1) {
-                               logger.error("Could not process input " + 
portName, e1);
-                       }
-
-                       // value type may vary
-                       List<Element> valueElements = inputport.getChildren(); 
// hopefully
-                       // in the right order...
-                       if (valueElements != null && valueElements.size() > 0) {
-                               Element valueEl = valueElements.get(0); // 
expect only 1 child
-                               //                              
processVarBinding(valueEl, processor, portName, iterationVector,
-                               //                              dataflow);
-
-                               List<PortBinding> newBindings = 
processPortBinding(valueEl,
-                                               procBinding.getProcessorName(), 
portName,
-                                               
procBinding.getIterationVector(), getWorkflowRunId(),
-                                               currentWorkflowID);
-                               // this is a list whenever valueEl is of type 
list: in this case processVarBinding recursively
-                               // processes all values within the collection, 
and generates one PortBinding record for each of them
-
-                               allInputVarBindings.addAll(newBindings);
-
-                               //                              // if the new 
binding involves list values, then check to see if they need to be propagated 
back to
-//                             // results of iterations
-
-                               // Backpatching disabled as it is very 
inefficient and not needed
-                               // for current Taverna usage
-
-                               try {
-                                       if (backpatching)
-                                               
backpatchIterationResults(newBindings);
-                               } catch (SQLException e) {
-                                       logger.warn("Problem with back patching 
iteration results", e);
-                               }
-                       } else {
-                               if (valueElements != null)
-                                       logger.debug("port name " + portName + 
"  "
-                                                       + valueElements.size());
-                               else
-                                       logger.debug("valueElements is null for 
port name "
-                                                       + portName);
-                       }
-               }
-       }
-
-       /**
-        * capture the default case where the value is not a list
-        *
-        * @param valueEl
-        * @param processorId
-        * @param portName
-        * @param iterationId
-        * @param workflowRunId
-        * @param currentWorkflowID
-        */
-       private List<PortBinding> processPortBinding(Element valueEl,
-                       String processorId, String portName, String iterationId,
-                       String workflowRunId, String currentWorkflowID) {
-               // uses the defaults:
-               // collIdRef = null
-               // parentcollectionRef = null
-               // positionInCollection = 1
-               return processPortBinding(valueEl, processorId, portName, null, 
1, null,
-                               iterationId, workflowRunId, null, 
currentWorkflowID);
-       }
-
-       /**
-        * general case where value can be a list
-        * @param valueEl
-        * @param processorId
-        * @param portName
-        * @param collIdRef
-        * @param positionInCollection
-        * @param parentCollectionRef
-        * @param iterationId
-        * @param workflowRunId
-        * @param currentWorkflowID
-        */
-       @SuppressWarnings("unchecked")
-       private List<PortBinding> processPortBinding(Element valueEl,
-                       String processorId, String portName, String collIdRef,
-                       int positionInCollection, String parentCollectionRef,
-                       String iterationId, String workflowRunId, String 
itVector,
-                       String currentWorkflowID) {
-               List<PortBinding> newBindings = new ArrayList<>();
-
-               String valueType = valueEl.getName();
-               //              logger.info("value element for " + processorId 
+ ": "
-               //              + valueType);
-
-               String iterationVector = (itVector == null ? 
extractIterationVector(iterationId)
-                               : itVector);
-
-               PortBinding vb = new PortBinding();
-
-               vb.setWorkflowId(currentWorkflowID);
-               vb.setWorkflowRunId(workflowRunId);
-               vb.setProcessorName(processorId);
-               vb.setValueType(valueType);
-               vb.setPortName(portName);
-               vb.setCollIDRef(collIdRef);
-               vb.setPositionInColl(positionInCollection);
-
-               newBindings.add(vb);
-
-               if (valueType.equals("literal")) {
-                       try {
-                               vb.setIteration(iterationVector);
-                               vb.setValue(valueEl.getAttributeValue("id"));
-                               logger.debug("new input VB with 
workflowId="+currentWorkflowID+" processorId="+processorId+
-                                               " valueType="+valueType+" 
portName="+portName+" collIdRef="+collIdRef+
-                                               " 
position="+positionInCollection+" itvector="+iterationVector+
-                                               " value="+vb.getValue());
-                               getPw().addPortBinding(vb);
-                       } catch (SQLException e) {
-                               logger.warn("Process Port Binding problem with 
provenance", e);
-                       }
-
-               } else if (valueType.equals("referenceSet")) {
-                       vb.setIteration(iterationVector);
-                       vb.setValue(valueEl.getAttributeValue("id"));
-                       vb.setReference(valueEl.getChildText("reference"));
-
-                       logger.debug("new input VB with workflowId=" + 
currentWorkflowID
-                                       + " processorId=" + processorId + " 
valueType=" + valueType
-                                       + " portName=" + portName + " 
collIdRef=" + collIdRef
-                                       + " position=" + positionInCollection + 
" itvector="
-                                       + iterationVector + " value=" + 
vb.getValue());
-
-                       try {
-                               getPw().addPortBinding(vb);
-                       } catch (SQLException e) {
-                               logger.debug("Problem processing var binding -- 
performing update instead of insert", e); //, e);
-                               // try to update the existing record instead 
using the current collection info
-
-                               getPw().updatePortBinding(vb);
-                       }
-
-               } else if (valueType.equals("list")) {
-                       logger.debug("input of type list");
-
-                       // add entries to the Collection and to the PortBinding 
tables
-                       // list id --> Collection.collId
-
-                       String collId = valueEl.getAttributeValue("id");
-                       try {
-                               parentCollectionRef = 
getPw().addCollection(processorId, collId,
-                                               parentCollectionRef, 
iterationVector, portName,
-                                               workflowRunId);
-
-                               // iterate over each list element
-                               List<Element> listElements = 
valueEl.getChildren();
-
-                               positionInCollection = 1;  // also use this as 
a suffix to extend the iteration vector
-
-                               // extend iteration vector to account for 
additional levels within the list
-
-                               String originalIterationVector = 
iterationVector;
-
-                               // children can be any base type, including 
list itself -- so
-                               // use recursion
-                               for (Element el : listElements) {
-                                       if (originalIterationVector.length() > 
2) // vector is not empty
-                                               iterationVector = 
originalIterationVector.substring(0,
-                                                               
originalIterationVector.length()-1) + ","+
-                                                               
Integer.toString(positionInCollection-1) + "]";
-                                       else
-                                               iterationVector = "["+ 
(positionInCollection-1) + "]";
-
-                                       
newBindings.addAll(processPortBinding(el, processorId,
-                                                       portName, collId, 
positionInCollection,
-                                                       parentCollectionRef, 
iterationId, workflowRunId,
-                                                       iterationVector, 
currentWorkflowID));
-
-                                       positionInCollection++;
-                               }
-                       } catch (SQLException e) {
-                               logger.warn("Problem processing var binding", 
e);
-                       }
-               } else if (valueType.equals("error")) {
-                       vb.setIteration(iterationVector);
-                       vb.setValue(valueEl.getAttributeValue("id"));
-                       vb.setReference(valueEl.getChildText("reference"));
-                       try {
-                               getPw().addPortBinding(vb);
-                       } catch (SQLException e) {
-                               logger.debug("Problem processing var binding -- 
performing update instead of insert", e); //, e);
-                               // try to update the existing record instead 
using the current collection info
-
-                               getPw().updatePortBinding(vb);
-                       }
-               } else {
-                       logger.warn("unrecognized value type element for "
-                                       + processorId + ": " + valueType);
-               }
-
-               return newBindings;
-       }
-
-
-       /**
-        * OBSOLETE: returns the iteration vector x,y,z,... from [x,y,z,...]
-        * <p/>
-        * now returns the vector itself -- this is still experimental
-        *
-        * @param iteration
-        * @return
-        */
-       @Deprecated
-       String extractIterationVector(String iteration) {
-               return iteration;
-       }
-
-       /**
-        * silly class to hold pairs of strings. any better way??
-        * @author paolo
-        *
-        */
-       class Pair {
-               String v1, v2;
-
-               public Pair(String current, String workflowId) {
-                       v1=current; v2=workflowId;
-               }
-
-               /**
-                * @return the v1
-                */
-               public String getV1() {
-                       return v1;
-               }
-
-               /**
-                * @param v1 the v1 to set
-                */
-               public void setV1(String v1) {
-                       this.v1 = v1;
-               }
-
-               /**
-                * @return the v2
-                */
-               public String getV2() {
-                       return v2;
-               }
-
-               /**
-                * @param v2 the v2 to set
-                */
-               public void setV2(String v2) {
-                       this.v2 = v2;
-               }
-       }
-
-       @SuppressWarnings("deprecation")
-       public List<Pair> toposort(String dataflowName, String workflowRunId) 
throws SQLException  {
-
-//             String workflowId = pq.getworkflowIdForDataflow(dataflowName, 
workflowRunId);
-               String workflowId = 
pq.getWorkflowIdForExternalName(dataflowName);
-
-               // fetch processors along with the count of their predecessors
-               Map<String, Integer> predecessorsCount = 
getPq().getPredecessorsCount(workflowRunId);
-               Map<String, List<String>> successorsOf = new HashMap<String, 
List<String>>();
-//             List<String> procList = pq.getContainedProcessors(dataflowName, 
workflowRunId);
-               List<String> procList = pq.getContainedProcessors(dataflowName);
-
-               for (String s:procList) {
-                       List<String> successors = getPq().getSuccProcessors(s, 
workflowId, workflowRunId);
-                       successorsOf.put(s, successors);
-               }
-
-               List<Pair>  sorted = tsort(procList, dataflowName, 
predecessorsCount, successorsOf, workflowId, workflowRunId);
-
-               for (int i=0; i< sorted.size(); i++) {
-                       String procName = sorted.get(i).getV1();
-
-                       if (pq.isDataflow(procName) && 
!procName.equals(dataflowName)) {  // handle weirdness: a dataflow is contained 
within itself..
-                               // recurse on procName
-                               List<Pair> sortedSublist = toposort(procName, 
workflowRunId);
-
-                               // replace procName with sortedSublist in sorted
-                               sorted.remove(i);
-                               sorted.addAll(i, sortedSublist);
-                       }
-               }
-               return sorted;
-       }
-
-
-
-       /**
-        * @param procList
-        * @param predecessorsCount
-        * @param successorsOf
-        * @param workflowRunId
-        * @return
-        * @throws SQLException
-        */
-       public List<Pair> tsort(List<String> procList, String dataflowName,
-                       Map<String, Integer> predecessorsCount,
-                       Map<String, List<String>> successorsOf, String 
workflowId,
-                       String workflowRunId) throws SQLException {
-               List<Pair> l = new ArrayList<>();               // holds sorted 
elements
-               List<String> q = new ArrayList<>();     // temp queue
-
-               // init queue with procList processors that have no predecessors
-               for (String proc:procList)
-                       if (predecessorsCount.get(proc) == null || 
predecessorsCount.get(proc) == 0 &&
-                                       !proc.equals(dataflowName))
-                               q.add(proc);
-
-               while (!q.isEmpty()) {
-                       String current = q.remove(0);
-                       l.add(new Pair(current, workflowId));
-
-                       List<String> successors = successorsOf.get(current);
-
-                       if (successors == null)
-                               continue;
-
-                       // reduce the number of predecessors to each of the 
successors by one
-                       // NB we must traverse an additional datalink through a 
nested workflow input if the successor is a dataflow!!
-                       for (String succ : successors) {
-                               // decrease edge count for each successor 
processor
-                               predecessorsCount.put(succ, 
predecessorsCount.get(succ) - 1);
-
-                               if (predecessorsCount.get(succ) == 0 && 
!succ.equals(dataflowName))
-                                       q.add(succ);
-                       }
-               } // end loop on q
-               return l;
-       }
-
-       @SuppressWarnings("deprecation")
-       public void propagateANL(String workflowRunId) throws SQLException {
-               String top = pq.getTopLevelDataflowName(workflowRunId);
-
-               // //////////////////////
-               // PHASE I: toposort the processors in the whole graph
-               // //////////////////////
-               List<Pair> sorted = toposort(top, workflowRunId);
-
-               List<String> sortedProcessors = new ArrayList<>();
-
-               for (Pair p : sorted)
-                       sortedProcessors.add(p.getV1());
-
-               logger.debug("final sorted list of processors");
-               for (Pair p : sorted)
-                       logger.debug(p.getV1() + " in workflowId " + p.getV2());
-
-               // //////////////////////
-               // PHASE II: traverse and set anl on each port
-               // //////////////////////
-
-               //              // sorted processor names in L at this point
-               //              // process them in order
-               for (Pair pnameInContext : sorted) {
-                       //                      // process pname's inputs -- 
set ANL to be the DNL if not set in prior steps
-                       String pname     = pnameInContext.getV1();
-                       String workflowId = pnameInContext.getV2();
-
-                       List<Port> inputs = getPq().getInputPorts(pname, 
workflowId); // null -> do not use instance (??) CHECK
-
-                       int totalANL = 0;
-                       for (Port iv : inputs) {
-
-                               if (! iv.isResolvedDepthSet()) {
-                                       iv.setResolvedDepth(iv.getDepth());
-                                       getPw().updatePort(iv);
-                               }
-
-                               int delta_nl = iv.getResolvedDepth() - 
iv.getDepth();
-
-                               // if delta_nl < 0 then Taverna wraps the value 
into a list --> use dnl(X) in this case
-                               if (delta_nl < 0 ) delta_nl = 0;// CHECK 
iv.getTypedepth();
-
-                               totalANL += delta_nl;
-
-                               // this should take care of the special case of 
the top level dataflow with inputs that have successors in the graph
-                               // propagate this through all the links from 
this var
-//                             List<Port> successors = 
getPq().getSuccVars(pname, iv.getVName(), workflowRunId);
-
-//                             for (Port v : successors) {
-//                             v.setresolvedDepth(iv.getresolvedDepth());
-//                             getPw().updateVar(v);
-//                             }
-                       }
-
-                       // process pname's outputs -- set ANL based on the sum 
formula (see
-                       // paper)
-                       for (Port ov : getPq().getOutputPorts(pname, 
workflowId)) {
-
-                               ov.setResolvedDepth(ov.getDepth() + totalANL);
-
-                               logger.debug("anl for 
"+pname+":"+ov.getPortName()+" = "+(ov.getDepth() + totalANL));
-                               getPw().updatePort(ov);
-
-                               // propagate this through all the links from 
this var
-                               for (Port v : getPq().getSuccPorts(pname, 
ov.getPortName(), workflowId)) {
-                                       List<Port> toBeProcessed = new 
ArrayList<>();
-                                       toBeProcessed.add(v);
-
-                                       if (v.getProcessorId() == null && 
v.isInputPort()) {  // this is the input to a nested workflow
-//                                             String tempWorkflowId = 
pq.getworkflowIdForDataflow(v.getPName(), workflowRunId);
-                                               String tempWorkflowId = pq
-                                                               
.getWorkflowIdForExternalName(v
-                                                                               
.getProcessorName());
-                                               List<Port> realSuccessors = 
getPq().getSuccPorts(
-                                                               
v.getProcessorName(), v.getPortName(),
-                                                               tempWorkflowId);
-
-                                               toBeProcessed.remove(0);
-                                               
toBeProcessed.addAll(realSuccessors);
-
-                                       } else if (v.getProcessorId() == null 
&& !v.isInputPort()) {  // this is the output to a nested workflow
-//                                             String tempworkflowId = 
pq.getworkflowIdForDataflow(v.getPName(), workflowRunId);
-                                               List<Port> realSuccessors = 
getPq().getSuccPorts(
-                                                               
v.getProcessorName(), v.getPortName(), null);
-
-                                               toBeProcessed.remove(0);
-                                               
toBeProcessed.addAll(realSuccessors);
-                                       }
-
-                                       for (Port v1 : toBeProcessed) {
-                                               
v1.setResolvedDepth(ov.getResolvedDepth());
-                                               logger.debug("anl for " + 
v1.getProcessorName() + ":"
-                                                               + 
v1.getPortName() + " = "
-                                                               + 
ov.getResolvedDepth());
-                                               getPw().updatePort(v1);
-                                       }
-                               }
-                       }
-               }
-       }
-
-       public void setPw(ProvenanceWriter pw) {
-               this.pw = pw;
-       }
-
-       public ProvenanceWriter getPw() {
-               return pw;
-       }
-
-       public void setPq(ProvenanceQuery pq) {
-               this.pq = pq;
-       }
-
-       public ProvenanceQuery getPq() {
-               return pq;
-       }
-
-       public void setWorkflowRunId(String workflowRunId) {
-               this.workflowRunId = workflowRunId;
-       }
-
-       public String getWorkflowRunId() {
-               return workflowRunId;
-       }
-
-       public void setWfdp(WorkflowDataProcessor wfdp) {
-               this.wfdp = wfdp;
-       }
-
-       public WorkflowDataProcessor getWfdp() {
-               return wfdp;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageQueryResultRecord.java
----------------------------------------------------------------------
diff --git 
a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageQueryResultRecord.java
 
b/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageQueryResultRecord.java
deleted file mode 100644
index 975a83a..0000000
--- 
a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageQueryResultRecord.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester   
- * 
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- * 
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *    
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *    
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- 
******************************************************************************/
-package net.sf.taverna.t2.provenance.lineageservice;
-
-/**
- * 
- * This Java bean holds a single provenance record, i.e., the finest element of
- * a provenance graph that is stored in the provenance DB. Essentially this
- * represents one data element (value) flowing through a port (vname) of a
- * processor (pname), in the context of one run (wfInstance) of a workflow
- * (wfname). The record may include an <b>iteration</b> vector, used when the
- * same processor receives multiple values on the same port, as part of
- * iterative processing. When the value belongs to a collection (a nested 
list),
- * the <b>collIdRef</b> field contains a reference to that collection.
- * 
- * @author Paolo Missier
- * 
- */
-public class LineageQueryResultRecord {
-       private String workflowId;
-       private String processorName;
-       private String portName;
-       private String workflowRunId;
-       private String iteration;
-       private String value;     // atomic or XML-formatted collection -- this 
is actually a reference to the value...
-       private String collIdRef;
-       private String parentCollIDRef;
-       private String resolvedValue;
-       private String type;  // one of referenceSet, referenceSetCollection
-       boolean printResolvedValue;
-       boolean isInput; 
-       boolean isCollection;
-
-       @Override
-       public String toString() {
-               if (isCollection) {
-                       return "COLLECTION: proc " + getProcessorName() + " var 
"
-                                       + getPortName() + " " + " iteration: " 
+ getIteration()
-                                       + " value: " + getValue() + " 
collection id: "
-                                       + getCollectionT2Reference() + " parent 
collection: "
-                                       + getParentCollIDRef();
-               } else if (printResolvedValue)
-                       return "workflow " + getworkflowId() + " proc "
-                                       + getProcessorName() + " var " + 
getPortName() + " "
-                                       + " iteration: " + getIteration() + " 
value: " + getValue()
-                                       + " collection id: " + 
getCollectionT2Reference()
-                                       + " resolvedValue: " + 
getResolvedValue();
-               else
-                       return "workflow " + getworkflowId() + " proc "
-                                       + getProcessorName() + " var " + 
getPortName() + " "
-                                       + " iteration: " + getIteration() + " 
collection id: "
-                                       + getCollectionT2Reference() + " value: 
" + getValue();
-       }
-
-       /**
-        * @return the pname
-        */
-       public String getProcessorName() {
-               return processorName;
-       }
-       /**
-        * @param pname the pname to set
-        */
-       public void setProcessorName(String pname) {
-               this.processorName = pname;
-       }
-       /**
-        * @return the vname
-        */
-       public String getPortName() {
-               return portName;
-       }
-       /**
-        * @param vname the vname to set
-        */
-       public void setPortName(String vname) {
-               this.portName = vname;
-       }
-       /**
-        * @return the workflowRun
-        */
-       public String getWorkflowRunId() {
-               return workflowRunId;
-       }
-       /**
-        * @param workflowRun the workflowRun to set
-        */
-       public void setWorkflowRunId(String workflowRun) {
-               this.workflowRunId = workflowRun;
-       }
-       /**
-        * @return the value
-        */
-       public String getValue() {
-               return value;
-       }
-       /**
-        * @param value the value to set
-        */
-       public void setValue(String value) {
-               this.value = value;
-       }
-       /**
-        * @return the type
-        */
-       public String getType() {
-               return type;
-       }
-       /**
-        * @param type the type to set
-        */
-       public void setType(String type) {
-               this.type = type;
-       }
-       /**
-        * @return the iteration
-        */
-       public String getIteration() {
-               return iteration;
-       }
-       /**
-        * @param iteration the iteration to set
-        */
-       public void setIteration(String iteration) {
-               this.iteration = iteration;
-       }
-       /**
-        * @return the resolvedValue
-        */
-       public String getResolvedValue() {
-               return resolvedValue;
-       }
-       /**
-        * @param resolvedValue the resolvedValue to set
-        */
-       public void setResolvedValue(String resolvedValue) {
-               this.resolvedValue = resolvedValue;
-       }
-
-
-       public void setPrintResolvedValue(boolean printResolvedValue) {
-               this.printResolvedValue = printResolvedValue;
-       }
-
-
-       /**
-        * @return the isInput
-        */
-       public boolean isInputPort() {
-               return isInput;
-       }
-
-
-       /**
-        * @param isInput the isInput to set
-        */
-       public void setIsInputPort(boolean isInput) {
-               this.isInput = isInput;
-       }
-
-
-       /**
-        * @return the collIdRef
-        */
-       public String getCollectionT2Reference() {
-               return collIdRef;
-       }
-
-
-       /**
-        * @param collIdRef the collIdRef to set
-        */
-       public void setCollectionT2Reference(String collIdRef) {
-               this.collIdRef = collIdRef;
-       }
-
-
-       /**
-        * @return the isCollection
-        */
-       public boolean isCollection() {
-               return isCollection;
-       }
-
-
-       /**
-        * @param isCollection the isCollection to set
-        */
-       public void setCollection(boolean isCollection) {
-               this.isCollection = isCollection;
-       }
-
-
-       /**
-        * @return the parentCollIDRef
-        */
-       public String getParentCollIDRef() {
-               return parentCollIDRef;
-       }
-
-
-       /**
-        * @param parentCollIDRef the parentCollIDRef to set
-        */
-       public void setParentCollIDRef(String parentCollIDRef) {
-               this.parentCollIDRef = parentCollIDRef;
-       }
-
-
-       /**
-        * @return the workflowId
-        */
-       public String getworkflowId() {
-               return workflowId;
-       }
-
-
-       /**
-        * @param workflowId the workflowId to set
-        */
-       public void setWorkflowId(String workflowId) {
-               this.workflowId = workflowId;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageSQLQuery.java
----------------------------------------------------------------------
diff --git 
a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageSQLQuery.java
 
b/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageSQLQuery.java
deleted file mode 100644
index 58d7c3d..0000000
--- 
a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageSQLQuery.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester   
- * 
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- * 
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *    
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *    
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- 
******************************************************************************/
-package net.sf.taverna.t2.provenance.lineageservice;
-
-/**
- * encapsulates an SQL query along with directives on how to interpret the
- * results, i.e., which elements of the select clause are to be considered
- * relevant. For instance when the query includes a join with Collection, the
- * intent is that lineage should return the collection itself as opposed to any
- * of its elements.
- * 
- * @author paolo
- * 
- */
-public class LineageSQLQuery {
-       private String vbQuery = null;
-       private String collQuery = null;
-
-       /** =0 => use var values, >0 => use enclosing collection */
-       private int depth = 0;
-
-       /**
-        * @return the depth
-        */
-       public int getdepth() {
-               return depth;
-       }
-
-       /**
-        * @param depth
-        *            the depth to set
-        */
-       public void setdepth(int depth) {
-               this.depth = depth;
-       }
-
-       /**
-        * @return the collQuery
-        */
-       public String getCollQuery() {
-               return collQuery;
-       }
-
-       /**
-        * @param collQuery the collQuery to set
-        */
-       public void setCollQuery(String collQuery) {
-               this.collQuery = collQuery;
-       }
-
-       /**
-        * @return the vbQuery
-        */
-       public String getVbQuery() {
-               return vbQuery;
-       }
-
-       /**
-        * @param vbQuery the vbQuery to set
-        */
-       public void setVbQuery(String vbQuery) {
-               this.vbQuery = vbQuery;
-       }       
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/Provenance.java
----------------------------------------------------------------------
diff --git 
a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/Provenance.java
 
b/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/Provenance.java
deleted file mode 100644
index 621e351..0000000
--- 
a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/Provenance.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester   
- * 
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- * 
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *    
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *    
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- 
******************************************************************************/
-package net.sf.taverna.t2.provenance.lineageservice;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import net.sf.taverna.t2.provenance.connector.AbstractProvenanceConnector;
-import net.sf.taverna.t2.provenance.item.ProvenanceItem;
-import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
-import net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary;
-import net.sf.taverna.t2.workflowmodel.Dataflow;
-
-import org.apache.log4j.Logger;
-
-/**
- * Implemented by the database class that a {@link AbstractProvenanceConnector}
- * implementation uses for storage purposes
- * 
- * @author Paolo Missier
- * @author Ian Dunlop
- * 
- */
-//FIXME is this class really needed. Can't we just push the
-//acceptRawProvanceEvent up into the ProvenanceConnector?
-public class Provenance {
-       private static Logger logger = Logger.getLogger(Provenance.class);
-
-       protected ProvenanceQuery pq;
-       protected ProvenanceWriter pw;
-       protected EventProcessor ep;
-
-       private String saveEvents;
-       
-       private volatile boolean firstWorkflowStructure = true;
-
-       public boolean isFirstWorkflowStructure() {
-               return firstWorkflowStructure;
-       }
-
-       public void setFirstWorkflowStructure(boolean firstWorkflowStructure) {
-               this.firstWorkflowStructure = firstWorkflowStructure;
-       }
-
-       private List<String> workflowIDStack = Collections.synchronizedList(new 
ArrayList<String>());
-       
-       private Map<String, String> workflowIDMap = new 
ConcurrentHashMap<String, String>();
-
-       public Provenance() {   }
-
-       public Provenance(EventProcessor eventProcessor) {
-               this.ep = eventProcessor;
-               this.pq = ep.getPq();
-               this.pw = ep.getPw();           
-       }
-
-       public void clearDB() throws SQLException {
-               getPw().clearDBStatic();
-               getPw().clearDBDynamic();
-       }
-       
-
-       /**
-        * @return the saveEvents
-        */
-       public String getSaveEvents() {
-               return saveEvents;
-       }
-
-       /**
-        * @param saveEvents
-        *            the saveEvents to set
-        */
-       public void setSaveEvents(String saveEvents) {
-               this.saveEvents = saveEvents;
-       }
-
-       // FIXME I think the provenance query and writer should both come from 
the
-       // EventProcessor
-       // seems silly setting the ep, pq and pw separately.
-       public void setPq(ProvenanceQuery pq) {
-               this.pq = pq;
-       }
-
-       public ProvenanceQuery getPq() {
-               return pq;
-       }
-
-       public void setPw(ProvenanceWriter pw) {
-               this.pw = pw;
-       }
-
-       public ProvenanceWriter getPw() {
-               return pw;
-       }
-
-       public void setEp(EventProcessor ep) {
-               this.ep = ep;
-       }
-
-       public EventProcessor getEp() {
-               return ep;
-       }
-
-       /**
-        * maps each incoming event to an insert query into the provenance store
-        * 
-        * @param eventType
-        * @param content
-        * @throws SQLException
-        * @throws IOException
-        */
-       public void acceptRawProvenanceEvent(SharedVocabulary eventType,
-                       ProvenanceItem provenanceItem) throws SQLException, 
IOException {
-               processEvent(provenanceItem, eventType);
-       }
-
-       /**
-        * parse d and generate SQL insert calls into the provenance DB
-        * 
-        * @param d
-        *            DOM for the event
-        * @param eventType
-        *            see {@link SharedVocabulary}
-        * @throws SQLException
-        * @throws IOException
-        */
-       protected void processEvent(ProvenanceItem provenanceItem,
-                       SharedVocabulary eventType) throws SQLException, 
IOException {
-               if (eventType.equals(SharedVocabulary.WORKFLOW_EVENT_TYPE)) {
-                       // process the workflow structure
-                       //workflowStartedMap.put()
-                       WorkflowProvenanceItem workflowProvenanceItem = 
(WorkflowProvenanceItem) provenanceItem;
-                       
-                       
getEp().getWfdp().workflowStarted.put(workflowProvenanceItem.getIdentifier(), 
workflowProvenanceItem.getInvocationStarted());
-                       if (isFirstWorkflowStructure()) {
-                               String dataflowId = 
workflowProvenanceItem.getDataflow().getIdentifier();
-                               String instanceId = 
provenanceItem.getIdentifier();
-                               
-                               workflowIDMap.put(instanceId, dataflowId);
-                               setFirstWorkflowStructure(false);
-                               String processWorkflowStructure = 
getEp().processWorkflowStructure(provenanceItem);
-                               synchronized(workflowIDStack) {
-                                       
workflowIDStack.add(0,processWorkflowStructure);
-                               }
-                               
-                               
getEp().propagateANL(provenanceItem.getIdentifier());
-                       } else {
-                               String dataflowId = 
workflowProvenanceItem.getDataflow().getIdentifier();
-                               String instanceId = 
provenanceItem.getIdentifier();
-                               
-                               workflowIDMap.put(instanceId, dataflowId);
-
-                               Dataflow df = 
workflowProvenanceItem.getDataflow();
-                               synchronized(workflowIDStack) {
-                                       
workflowIDStack.add(0,df.getIdentifier());
-                               }
-                       }
-               } else if 
(provenanceItem.getEventType().equals(SharedVocabulary.END_WORKFLOW_EVENT_TYPE))
 {
-//                     String currentWorkflowID = workflowIDStack.get(0);
-//                     workflowIDStack.remove(0);
-                       String currentWorkflowID = provenanceItem.getParentId();
-                       
-                       getEp().processProcessEvent(provenanceItem, 
currentWorkflowID);
-                       
-               } else {  // all other event types (iteration etc.)
-                       logger.debug("processEvent of type 
"+provenanceItem.getEventType()+" for item of type 
"+provenanceItem.getClass().getName());
-                       String currentWorkflowID = 
provenanceItem.getWorkflowId();
-//                     String currentWorkflowID = 
workflowIDMap.get(provenanceItem.getParentId());
-
-                       getEp().processProcessEvent(provenanceItem, 
currentWorkflowID);
-               
-//                     getEp().processProcessEvent(provenanceItem, 
workflowIDStack.get(0));
-               }
-       }
-}

Reply via email to