http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/CompletionNotifier.java
----------------------------------------------------------------------
diff --git 
a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/CompletionNotifier.java
 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/CompletionNotifier.java
new file mode 100644
index 0000000..1868f94
--- /dev/null
+++ 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/CompletionNotifier.java
@@ -0,0 +1,58 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * How to convert a notification about the completion of a job into a message.
+ * 
+ * @author Donal Fellows
+ */
+public interface CompletionNotifier {
+       /**
+        * @return The name of this notifier.
+        */
+       String getName();
+
+       /**
+        * Called to get the content of a message that a workflow run has 
finished.
+        * 
+        * @param name
+        *            The name of the run.
+        * @param run
+        *            What run are we talking about.
+        * @param code
+        *            What the exit code was.
+        * @return The plain-text content of the message.
+        */
+       String makeCompletionMessage(String name, RemoteRunDelegate run, int 
code);
+
+       /**
+        * Called to get the subject of the message to dispatch.
+        * 
+        * @param name
+        *            The name of the run.
+        * @param run
+        *            What run are we talking about.
+        * @param code
+        *            What the exit code was.
+        * @return The plain-text subject of the message.
+        */
+       String makeMessageSubject(String name, RemoteRunDelegate run, int code);
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/FactoryBean.java
----------------------------------------------------------------------
diff --git 
a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/FactoryBean.java
 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/FactoryBean.java
new file mode 100644
index 0000000..d38f0cc
--- /dev/null
+++ 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/FactoryBean.java
@@ -0,0 +1,39 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.taverna.server.master.notification.atom.EventDAO;
+
+/**
+ * What the remote run really needs of its factory.
+ * 
+ * @author Donal Fellows
+ */
+public interface FactoryBean {
+       /**
+        * @return Whether a run can actually be started at this time.
+        */
+       boolean isAllowingRunsToStart();
+
+       /**
+        * @return a handle to the master Atom event feed (<i>not</i> the 
per-run
+        *         feed)
+        */
+       EventDAO getMasterEventFeed();
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PasswordIssuer.java
----------------------------------------------------------------------
diff --git 
a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PasswordIssuer.java
 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PasswordIssuer.java
new file mode 100644
index 0000000..649db64
--- /dev/null
+++ 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PasswordIssuer.java
@@ -0,0 +1,73 @@
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A simple password issuing bean.
+ * 
+ * @author Donal Fellows
+ */
+public class PasswordIssuer {
+       private static final char[] ALPHABET = { 'a', 'b', 'c', 'd', 'e', 'f', 
'g',
+                       'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 
's', 't',
+                       'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 
'F', 'G',
+                       'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 
'S', 'T',
+                       'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2', '3', '4', '5', 
'6', '7',
+                       '8', '9', '0', '!', '@', '#', '$', '%', '^', '&', '*', 
'(', ')',
+                       ',', '.', '<', '>', '/', '?', ':', ';', '-', '_', '+', 
'[', ']',
+                       '{', '}', '`', '~' };
+       private Log log = LogFactory.getLog("Taverna.Server.Worker");
+       private SecureRandom r;
+       private int length;
+
+       public PasswordIssuer() {
+               r = new SecureRandom();
+               log.info("constructing passwords with " + r.getAlgorithm());
+               setLength(8);
+       }
+
+       public PasswordIssuer(String algorithm) throws NoSuchAlgorithmException 
{
+               r = SecureRandom.getInstance(algorithm);
+               log.info("constructing passwords with " + r.getAlgorithm());
+               setLength(8);
+       }
+
+       public void setLength(int length) {
+               this.length = length;
+               log.info("issued password will be " + this.length
+                               + " symbols chosen from " + ALPHABET.length);
+       }
+
+       /**
+        * Issue a password.
+        * 
+        * @return The new password.
+        */
+       public String issue() {
+               StringBuilder sb = new StringBuilder();
+               for (int i = 0; i < length; i++)
+                       sb.append(ALPHABET[r.nextInt(ALPHABET.length)]);
+               log.info("issued new password of length " + sb.length());
+               return sb.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyImpl.java
----------------------------------------------------------------------
diff --git 
a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyImpl.java
 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyImpl.java
new file mode 100644
index 0000000..37d5760
--- /dev/null
+++ 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyImpl.java
@@ -0,0 +1,171 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static 
org.taverna.server.master.identity.WorkflowInternalAuthProvider.PREFIX;
+
+import java.net.URI;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.annotation.Required;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.GrantedAuthority;
+import org.springframework.security.core.context.SecurityContextHolder;
+import org.taverna.server.master.common.Roles;
+import org.taverna.server.master.common.Workflow;
+import org.taverna.server.master.exceptions.NoCreateException;
+import org.taverna.server.master.exceptions.NoDestroyException;
+import org.taverna.server.master.exceptions.NoUpdateException;
+import org.taverna.server.master.interfaces.Policy;
+import org.taverna.server.master.interfaces.TavernaRun;
+import org.taverna.server.master.interfaces.TavernaSecurityContext;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * Basic policy implementation that allows any workflow to be instantiated by
+ * any user, but which does not permit users to access each others workflow
+ * runs. It also imposes a global limit on the number of workflow runs at once.
+ * 
+ * @author Donal Fellows
+ */
+class PolicyImpl implements Policy {
+       Log log = LogFactory.getLog("Taverna.Server.Worker.Policy");
+       private PolicyLimits limits;
+       private RunDBSupport runDB;
+
+       @Required
+       public void setLimits(PolicyLimits limits) {
+               this.limits = limits;
+       }
+
+       @Required
+       public void setRunDB(RunDBSupport runDB) {
+               this.runDB = runDB;
+       }
+
+       @Override
+       public int getMaxRuns() {
+               return limits.getMaxRuns();
+       }
+
+       @Override
+       public Integer getMaxRuns(UsernamePrincipal user) {
+               return null;
+       }
+
+       @Override
+       public int getOperatingLimit() {
+               return limits.getOperatingLimit();
+       }
+
+       @Override
+       public List<URI> listPermittedWorkflowURIs(UsernamePrincipal user) {
+               return limits.getPermittedWorkflowURIs();
+       }
+
+       private boolean isSelfAccess(String runId) {
+               Authentication auth = SecurityContextHolder.getContext()
+                               .getAuthentication();
+               boolean self = false;
+               String id = null;
+               for (GrantedAuthority a : auth.getAuthorities()) {
+                       String aa = a.getAuthority();
+                       if (aa.equals(Roles.SELF)) {
+                               self = true;
+                               continue;
+                       }
+                       if (!aa.startsWith(PREFIX))
+                               continue;
+                       id = aa.substring(PREFIX.length());
+               }
+               return self && runId.equals(id);
+       }
+
+       @Override
+       public boolean permitAccess(UsernamePrincipal user, TavernaRun run) {
+               String username = user.getName();
+               TavernaSecurityContext context = run.getSecurityContext();
+               if (context.getOwner().getName().equals(username)) {
+                       if (log.isDebugEnabled())
+                               log.debug("granted access by " + user.getName() 
+ " to "
+                                               + run.getId());
+                       return true;
+               }
+               if (isSelfAccess(run.getId())) {
+                       if (log.isDebugEnabled())
+                               log.debug("access by workflow to itself: " + 
run.getId());
+                       return true;
+               }
+               if (log.isDebugEnabled())
+                       log.debug("considering access by " + user.getName() + " 
to "
+                                       + run.getId());
+               return context.getPermittedReaders().contains(username);
+       }
+
+       @Override
+       public void permitCreate(UsernamePrincipal user, Workflow workflow)
+                       throws NoCreateException {
+               if (user == null)
+                       throw new NoCreateException(
+                                       "anonymous workflow creation not 
allowed");
+               if (runDB.countRuns() >= getMaxRuns())
+                       throw new NoCreateException("server load exceeded; 
please wait");
+       }
+
+       @Override
+       public synchronized void permitDestroy(UsernamePrincipal user, 
TavernaRun run)
+                       throws NoDestroyException {
+               if (user == null)
+                       throw new NoDestroyException();
+               String username = user.getName();
+               TavernaSecurityContext context = run.getSecurityContext();
+               if (context.getOwner() == null
+                               || 
context.getOwner().getName().equals(username))
+                       return;
+               if (!context.getPermittedDestroyers().contains(username))
+                       throw new NoDestroyException();
+       }
+
+       @Override
+       public void permitUpdate(UsernamePrincipal user, TavernaRun run)
+                       throws NoUpdateException {
+               if (user == null)
+                       throw new NoUpdateException(
+                                       "workflow run not owned by you and 
you're not granted access");
+               TavernaSecurityContext context = run.getSecurityContext();
+               if (context.getOwner().getName().equals(user.getName()))
+                       return;
+               if (isSelfAccess(run.getId())) {
+                       if (log.isDebugEnabled())
+                               log.debug("update access by workflow to itself: 
" + run.getId());
+                       return;
+               }
+               if (!context.getPermittedUpdaters().contains(user.getName()))
+                       throw new NoUpdateException(
+                                       "workflow run not owned by you and 
you're not granted access");
+       }
+
+       @Override
+       public void setPermittedWorkflowURIs(UsernamePrincipal user,
+                       List<URI> permitted) {
+               limits.setPermittedWorkflowURIs(permitted);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyLimits.java
----------------------------------------------------------------------
diff --git 
a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyLimits.java
 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyLimits.java
new file mode 100644
index 0000000..43c0aa4
--- /dev/null
+++ 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyLimits.java
@@ -0,0 +1,56 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.net.URI;
+import java.util.List;
+
+import org.taverna.server.master.common.Status;
+
+/**
+ * The worker policy delegates certain limits to the state model of the
+ * particular worker.
+ * 
+ * @author Donal Fellows
+ */
+public interface PolicyLimits {
+       /**
+        * @return the maximum number of extant workflow runs in any state
+        */
+       int getMaxRuns();
+
+       /**
+        * @return the maximum number of workflow runs in the
+        *         {@linkplain Status#Operating operating} state.
+        */
+       int getOperatingLimit();
+
+       /**
+        * @return the list of URIs to workflows that may be used to create 
workflow
+        *         runs. If empty or <tt>null</tt>, no restriction is present.
+        */
+       List<URI> getPermittedWorkflowURIs();
+
+       /**
+        * @param permitted
+        *            the list of URIs to workflows that may be used to create
+        *            workflow runs.
+        */
+       void setPermittedWorkflowURIs(List<URI> permitted);
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RemoteRunDelegate.java
----------------------------------------------------------------------
diff --git 
a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RemoteRunDelegate.java
 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RemoteRunDelegate.java
new file mode 100644
index 0000000..fb1ac47
--- /dev/null
+++ 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RemoteRunDelegate.java
@@ -0,0 +1,980 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.lang.System.currentTimeMillis;
+import static java.util.Calendar.MINUTE;
+import static java.util.Collections.sort;
+import static java.util.Collections.unmodifiableSet;
+import static java.util.UUID.randomUUID;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.apache.commons.logging.LogFactory.getLog;
+import static 
org.taverna.server.master.worker.RemoteRunDelegate.checkBadFilename;
+import static org.taverna.server.master.worker.RunConnection.NAME_LENGTH;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.PipedOutputStream;
+import java.rmi.MarshalledObject;
+import java.rmi.RemoteException;
+import java.security.GeneralSecurityException;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import javax.annotation.Nonnull;
+
+import org.apache.commons.logging.Log;
+import org.taverna.server.localworker.remote.IllegalStateTransitionException;
+import org.taverna.server.localworker.remote.ImplementationException;
+import org.taverna.server.localworker.remote.RemoteDirectory;
+import org.taverna.server.localworker.remote.RemoteDirectoryEntry;
+import org.taverna.server.localworker.remote.RemoteFile;
+import org.taverna.server.localworker.remote.RemoteInput;
+import org.taverna.server.localworker.remote.RemoteListener;
+import org.taverna.server.localworker.remote.RemoteSingleRun;
+import org.taverna.server.localworker.remote.RemoteStatus;
+import org.taverna.server.localworker.remote.StillWorkingOnItException;
+import org.taverna.server.master.common.Status;
+import org.taverna.server.master.common.Workflow;
+import org.taverna.server.master.exceptions.BadPropertyValueException;
+import org.taverna.server.master.exceptions.BadStateChangeException;
+import org.taverna.server.master.exceptions.FilesystemAccessException;
+import org.taverna.server.master.exceptions.NoListenerException;
+import org.taverna.server.master.exceptions.OverloadedException;
+import org.taverna.server.master.exceptions.UnknownRunException;
+import org.taverna.server.master.interfaces.Directory;
+import org.taverna.server.master.interfaces.DirectoryEntry;
+import org.taverna.server.master.interfaces.File;
+import org.taverna.server.master.interfaces.Input;
+import org.taverna.server.master.interfaces.Listener;
+import org.taverna.server.master.interfaces.SecurityContextFactory;
+import org.taverna.server.master.interfaces.TavernaRun;
+import org.taverna.server.master.interfaces.TavernaSecurityContext;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * Bridging shim between the WebApp world and the RMI world.
+ * 
+ * @author Donal Fellows
+ */
+@SuppressWarnings("serial")
+public class RemoteRunDelegate implements TavernaRun {
+       private transient Log log = getLog("Taverna.Server.Worker");
+       transient TavernaSecurityContext secContext;
+       Date creationInstant;
+       Workflow workflow;
+       Date expiry;
+       HashSet<String> readers;
+       HashSet<String> writers;
+       HashSet<String> destroyers;
+       transient String id;
+       transient RemoteSingleRun run;
+       transient RunDBSupport db;
+       transient FactoryBean factory;
+       boolean doneTransitionToFinished;
+       boolean generateProvenance;// FIXME expose
+       String name;
+       private static final String ELLIPSIS = "...";
+
+       public RemoteRunDelegate(Date creationInstant, Workflow workflow,
+                       RemoteSingleRun rsr, int defaultLifetime, RunDBSupport 
db, UUID id,
+                       boolean generateProvenance, FactoryBean factory) {
+               if (rsr == null)
+                       throw new IllegalArgumentException("remote run must not 
be null");
+               this.creationInstant = creationInstant;
+               this.workflow = workflow;
+               Calendar c = Calendar.getInstance();
+               c.add(MINUTE, defaultLifetime);
+               this.expiry = c.getTime();
+               this.run = rsr;
+               this.db = db;
+               this.generateProvenance = generateProvenance;
+               this.factory = factory;
+               try {
+                       this.name = "";
+                       String ci = " " + creationInstant;
+                       String n = workflow.getName();
+                       if (n.length() > NAME_LENGTH - ci.length())
+                               n = n.substring(0,
+                                               NAME_LENGTH - ci.length() - 
ELLIPSIS.length())
+                                               + ELLIPSIS;
+                       this.name = n + ci;
+               } catch (Exception e) {
+                       // Ignore; it's just a name, not something important.
+               }
+               if (id != null)
+                       this.id = id.toString();
+       }
+
+       RemoteRunDelegate() {
+       }
+
+       /**
+        * Get the types of listener supported by this run.
+        * 
+        * @return A list of listener type names.
+        * @throws RemoteException
+        *             If anything goes wrong.
+        */
+       public List<String> getListenerTypes() throws RemoteException {
+               return run.getListenerTypes();
+       }
+
+       @Override
+       public void addListener(Listener listener) {
+               if (listener instanceof ListenerDelegate)
+                       try {
+                               run.addListener(((ListenerDelegate) 
listener).getRemote());
+                       } catch (RemoteException e) {
+                               log.warn("communication problem adding 
listener", e);
+                       } catch (ImplementationException e) {
+                               log.warn("implementation problem adding 
listener", e);
+                       }
+               else
+                       log.fatal("bad listener " + listener.getClass()
+                                       + "; not applicable remotely!");
+       }
+
+       @Override
+       public String getId() {
+               if (id == null)
+                       id = randomUUID().toString();
+               return id;
+       }
+
+       /**
+        * Attach a listener to a workflow run and return its local delegate.
+        * 
+        * @param type
+        *            The type of listener to create.
+        * @param config
+        *            The configuration of the listener.
+        * @return The local delegate of the listener.
+        * @throws NoListenerException
+        *             If anything goes wrong.
+        */
+       public Listener makeListener(String type, String config)
+                       throws NoListenerException {
+               try {
+                       return new ListenerDelegate(run.makeListener(type, 
config));
+               } catch (RemoteException e) {
+                       throw new NoListenerException("failed to make 
listener", e);
+               }
+       }
+
+       @Override
+       public void destroy() {
+               try {
+                       run.destroy();
+               } catch (RemoteException | ImplementationException e) {
+                       log.warn("failed to destroy run", e);
+               }
+       }
+
+       @Override
+       public Date getExpiry() {
+               return new Date(expiry.getTime());
+       }
+
+       @Override
+       public List<Listener> getListeners() {
+               List<Listener> listeners = new ArrayList<>();
+               try {
+                       for (RemoteListener rl : run.getListeners())
+                               listeners.add(new ListenerDelegate(rl));
+               } catch (RemoteException e) {
+                       log.warn("failed to get listeners", e);
+               }
+               return listeners;
+       }
+
+       @Override
+       public TavernaSecurityContext getSecurityContext() {
+               return secContext;
+       }
+
+       @Override
+       public Status getStatus() {
+               try {
+                       switch (run.getStatus()) {
+                       case Initialized:
+                               return Status.Initialized;
+                       case Operating:
+                               return Status.Operating;
+                       case Stopped:
+                               return Status.Stopped;
+                       case Finished:
+                               return Status.Finished;
+                       }
+               } catch (RemoteException e) {
+                       log.warn("problem getting remote status", e);
+               }
+               return Status.Finished;
+       }
+
+       @Override
+       public Workflow getWorkflow() {
+               return workflow;
+       }
+
+       @Override
+       public Directory getWorkingDirectory() throws FilesystemAccessException 
{
+               try {
+                       return new DirectoryDelegate(run.getWorkingDirectory());
+               } catch (Throwable e) {
+                       if (e.getCause() != null)
+                               e = e.getCause();
+                       throw new FilesystemAccessException(
+                                       "problem getting main working directory 
handle", e);
+               }
+       }
+
+       @Override
+       public void setExpiry(Date d) {
+               if (d.after(new Date()))
+                       expiry = new Date(d.getTime());
+               db.flushToDisk(this);
+       }
+
+       @Override
+       public String setStatus(Status s) throws BadStateChangeException {
+               try {
+                       log.info("setting status of run " + id + " to " + s);
+                       switch (s) {
+                       case Initialized:
+                               run.setStatus(RemoteStatus.Initialized);
+                               break;
+                       case Operating:
+                               if (run.getStatus() == 
RemoteStatus.Initialized) {
+                                       if (!factory.isAllowingRunsToStart())
+                                               throw new OverloadedException();
+                                       secContext.conveySecurity();
+                               }
+                               run.setGenerateProvenance(generateProvenance);
+                               run.setStatus(RemoteStatus.Operating);
+                               factory.getMasterEventFeed()
+                                               .started(
+                                                               this,
+                                                               "started run 
execution",
+                                                               "The execution 
of run '" + getName()
+                                                                               
+ "' has started.");
+                               break;
+                       case Stopped:
+                               run.setStatus(RemoteStatus.Stopped);
+                               break;
+                       case Finished:
+                               run.setStatus(RemoteStatus.Finished);
+                               break;
+                       }
+                       return null;
+               } catch (IllegalStateTransitionException e) {
+                       throw new BadStateChangeException(e.getMessage());
+               } catch (RemoteException e) {
+                       throw new BadStateChangeException(e.getMessage(), 
e.getCause());
+               } catch (GeneralSecurityException | IOException e) {
+                       throw new BadStateChangeException(e.getMessage(), e);
+               } catch (ImplementationException e) {
+                       if (e.getCause() != null)
+                               throw new 
BadStateChangeException(e.getMessage(), e.getCause());
+                       throw new BadStateChangeException(e.getMessage(), e);
+               } catch (StillWorkingOnItException e) {
+                       log.info("still working on setting status of run " + id 
+ " to "
+                                       + s, e);
+                       return e.getMessage();
+               } catch (InterruptedException e) {
+                       throw new BadStateChangeException(
+                                       "interrupted while waiting to insert 
notification into database");
+               }
+       }
+
+       static void checkBadFilename(String filename)
+                       throws FilesystemAccessException {
+               if (filename.startsWith("/"))
+                       throw new FilesystemAccessException("filename may not 
be absolute");
+               if (Arrays.asList(filename.split("/")).contains(".."))
+                       throw new FilesystemAccessException(
+                                       "filename may not refer to parent");
+       }
+
+       @Override
+       public String getInputBaclavaFile() {
+               try {
+                       return run.getInputBaclavaFile();
+               } catch (RemoteException e) {
+                       log.warn("problem when fetching input baclava file", e);
+                       return null;
+               }
+       }
+
+       @Override
+       public List<Input> getInputs() {
+               ArrayList<Input> inputs = new ArrayList<>();
+               try {
+                       for (RemoteInput ri : run.getInputs())
+                               inputs.add(new RunInput(ri));
+               } catch (RemoteException e) {
+                       log.warn("problem when fetching list of workflow 
inputs", e);
+               }
+               return inputs;
+       }
+
+       @Override
+       public String getOutputBaclavaFile() {
+               try {
+                       return run.getOutputBaclavaFile();
+               } catch (RemoteException e) {
+                       log.warn("problem when fetching output baclava file", 
e);
+                       return null;
+               }
+       }
+
+       @Override
+       public Input makeInput(String name) throws BadStateChangeException {
+               try {
+                       return new RunInput(run.makeInput(name));
+               } catch (RemoteException e) {
+                       throw new BadStateChangeException("failed to make 
input", e);
+               }
+       }
+
+       @Override
+       public void setInputBaclavaFile(String filename)
+                       throws FilesystemAccessException, 
BadStateChangeException {
+               checkBadFilename(filename);
+               try {
+                       run.setInputBaclavaFile(filename);
+               } catch (RemoteException e) {
+                       throw new FilesystemAccessException(
+                                       "cannot set input baclava file name", 
e);
+               }
+       }
+
+       @Override
+       public void setOutputBaclavaFile(String filename)
+                       throws FilesystemAccessException, 
BadStateChangeException {
+               checkBadFilename(filename);
+               try {
+                       run.setOutputBaclavaFile(filename);
+               } catch (RemoteException e) {
+                       throw new FilesystemAccessException(
+                                       "cannot set output baclava file name", 
e);
+               }
+       }
+
+       @Override
+       public Date getCreationTimestamp() {
+               return creationInstant == null ? null : new Date(
+                               creationInstant.getTime());
+       }
+
+       @Override
+       public Date getFinishTimestamp() {
+               try {
+                       return run.getFinishTimestamp();
+               } catch (RemoteException e) {
+                       log.info("failed to get finish timestamp", e);
+                       return null;
+               }
+       }
+
+       @Override
+       public Date getStartTimestamp() {
+               try {
+                       return run.getStartTimestamp();
+               } catch (RemoteException e) {
+                       log.info("failed to get finish timestamp", e);
+                       return null;
+               }
+       }
+
+       /**
+        * @param readers
+        *            the readers to set
+        */
+       public void setReaders(Set<String> readers) {
+               this.readers = new HashSet<>(readers);
+               db.flushToDisk(this);
+       }
+
+       /**
+        * @return the readers
+        */
+       public Set<String> getReaders() {
+               return readers == null ? new HashSet<String>()
+                               : unmodifiableSet(readers);
+       }
+
+       /**
+        * @param writers
+        *            the writers to set
+        */
+       public void setWriters(Set<String> writers) {
+               this.writers = new HashSet<>(writers);
+               db.flushToDisk(this);
+       }
+
+       /**
+        * @return the writers
+        */
+       public Set<String> getWriters() {
+               return writers == null ? new HashSet<String>()
+                               : unmodifiableSet(writers);
+       }
+
+       /**
+        * @param destroyers
+        *            the destroyers to set
+        */
+       public void setDestroyers(Set<String> destroyers) {
+               this.destroyers = new HashSet<>(destroyers);
+               db.flushToDisk(this);
+       }
+
+       /**
+        * @return the destroyers
+        */
+       public Set<String> getDestroyers() {
+               return destroyers == null ? new HashSet<String>()
+                               : unmodifiableSet(destroyers);
+       }
+
+       private void writeObject(ObjectOutputStream out) throws IOException {
+               out.defaultWriteObject();
+               out.writeUTF(secContext.getOwner().getName());
+               out.writeObject(secContext.getFactory());
+               out.writeObject(new MarshalledObject<>(run));
+       }
+
+       @Override
+       public boolean getGenerateProvenance() {
+               return generateProvenance;
+       }
+
+       @Override
+       public void setGenerateProvenance(boolean generateProvenance) {
+               this.generateProvenance = generateProvenance;
+               db.flushToDisk(this);
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(ObjectInputStream in) throws IOException,
+                       ClassNotFoundException {
+               in.defaultReadObject();
+               if (log == null)
+                       log = getLog("Taverna.Server.LocalWorker");
+               final String creatorName = in.readUTF();
+               SecurityContextFactory factory = (SecurityContextFactory) in
+                               .readObject();
+               try {
+                       secContext = factory.create(this,
+                                       new UsernamePrincipal(creatorName));
+               } catch (RuntimeException | IOException e) {
+                       throw e;
+               } catch (Exception e) {
+                       throw new SecurityContextReconstructionException(e);
+               }
+               run = ((MarshalledObject<RemoteSingleRun>) 
in.readObject()).get();
+       }
+
+       public void setSecurityContext(TavernaSecurityContext 
tavernaSecurityContext) {
+               secContext = tavernaSecurityContext;
+       }
+
+       @Override
+       public String getName() {
+               return name;
+       }
+
+       @Override
+       public void setName(@Nonnull String name) {
+               if (name.length() > RunConnection.NAME_LENGTH)
+                       this.name = name.substring(0, 
RunConnection.NAME_LENGTH);
+               else
+                       this.name = name;
+               db.flushToDisk(this);
+       }
+
+       @Override
+       public void ping() throws UnknownRunException {
+               try {
+                       run.ping();
+               } catch (RemoteException e) {
+                       throw new UnknownRunException(e);
+               }
+       }
+}
+
+abstract class DEDelegate implements DirectoryEntry {
+       Log log = getLog("Taverna.Server.Worker");
+       private RemoteDirectoryEntry entry;
+       private String name;
+       private String full;
+       private Date cacheModTime;
+       private long cacheQueryTime = 0L;
+
+       DEDelegate(RemoteDirectoryEntry entry) {
+               this.entry = entry;
+       }
+
+       @Override
+       public void destroy() throws FilesystemAccessException {
+               try {
+                       entry.destroy();
+               } catch (IOException e) {
+                       throw new FilesystemAccessException(
+                                       "failed to delete directory entry", e);
+               }
+       }
+
+       @Override
+       public String getFullName() {
+               if (full != null)
+                       return full;
+               String n = getName();
+               RemoteDirectoryEntry re = entry;
+               try {
+                       while (true) {
+                               RemoteDirectory parent = 
re.getContainingDirectory();
+                               if (parent == null)
+                                       break;
+                               n = parent.getName() + "/" + n;
+                               re = parent;
+                       }
+               } catch (RemoteException e) {
+                       log.warn("failed to generate full name", e);
+               }
+               return (full = n);
+       }
+
+       @Override
+       public String getName() {
+               if (name == null)
+                       try {
+                               name = entry.getName();
+                       } catch (RemoteException e) {
+                               log.error("failed to get name", e);
+                       }
+               return name;
+       }
+
+       @Override
+       public Date getModificationDate() {
+               if (cacheModTime == null || currentTimeMillis() - 
cacheQueryTime < 5000)
+                       try {
+                               cacheModTime = entry.getModificationDate();
+                               cacheQueryTime = currentTimeMillis();
+                       } catch (RemoteException e) {
+                               log.error("failed to get modification time", e);
+                       }
+               return cacheModTime;
+       }
+
+       @Override
+       public int compareTo(DirectoryEntry de) {
+               return getFullName().compareTo(de.getFullName());
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               return o != null && o instanceof DEDelegate
+                               && getFullName().equals(((DEDelegate) 
o).getFullName());
+       }
+
+       @Override
+       public int hashCode() {
+               return getFullName().hashCode();
+       }
+}
+
+class DirectoryDelegate extends DEDelegate implements Directory {
+       RemoteDirectory rd;
+
+       DirectoryDelegate(RemoteDirectory dir) {
+               super(dir);
+               rd = dir;
+       }
+
+       @Override
+       public Collection<DirectoryEntry> getContents()
+                       throws FilesystemAccessException {
+               ArrayList<DirectoryEntry> result = new ArrayList<>();
+               try {
+                       for (RemoteDirectoryEntry rde : rd.getContents()) {
+                               if (rde instanceof RemoteDirectory)
+                                       result.add(new 
DirectoryDelegate((RemoteDirectory) rde));
+                               else
+                                       result.add(new 
FileDelegate((RemoteFile) rde));
+                       }
+               } catch (IOException e) {
+                       throw new FilesystemAccessException(
+                                       "failed to get directory contents", e);
+               }
+               return result;
+       }
+
+       @Override
+       public Collection<DirectoryEntry> getContentsByDate()
+                       throws FilesystemAccessException {
+               ArrayList<DirectoryEntry> result = new 
ArrayList<>(getContents());
+               sort(result, new DateComparator());
+               return result;
+       }
+
+       static class DateComparator implements Comparator<DirectoryEntry> {
+               @Override
+               public int compare(DirectoryEntry a, DirectoryEntry b) {
+                       return 
a.getModificationDate().compareTo(b.getModificationDate());
+               }
+       }
+
+       @Override
+       public File makeEmptyFile(Principal actor, String name)
+                       throws FilesystemAccessException {
+               try {
+                       return new FileDelegate(rd.makeEmptyFile(name));
+               } catch (IOException e) {
+                       throw new FilesystemAccessException("failed to make 
empty file", e);
+               }
+       }
+
+       @Override
+       public Directory makeSubdirectory(Principal actor, String name)
+                       throws FilesystemAccessException {
+               try {
+                       return new DirectoryDelegate(rd.makeSubdirectory(name));
+               } catch (IOException e) {
+                       throw new FilesystemAccessException("failed to make 
subdirectory",
+                                       e);
+               }
+       }
+
+       @Override
+       public ZipStream getContentsAsZip() throws FilesystemAccessException {
+               ZipStream zs = new ZipStream();
+
+               final ZipOutputStream zos;
+               try {
+                       zos = new ZipOutputStream(new PipedOutputStream(zs));
+               } catch (IOException e) {
+                       throw new FilesystemAccessException("problem building 
zip stream",
+                                       e);
+               }
+               Thread t = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       zipDirectory(rd, null, zos);
+                               } catch (IOException e) {
+                                       log.warn("problem when zipping 
directory", e);
+                               } finally {
+                                       closeQuietly(zos);
+                               }
+                       }
+               });
+               t.setDaemon(true);
+               t.start();
+               return zs;
+       }
+
+       /**
+        * Compresses a directory tree into a ZIP.
+        * 
+        * @param dir
+        *            The directory to compress.
+        * @param base
+        *            The base name of the directory (or <tt>null</tt> if this 
is
+        *            the root directory of the ZIP).
+        * @param zos
+        *            Where to write the compressed data.
+        * @throws RemoteException
+        *             If some kind of problem happens with the remote 
delegates.
+        * @throws IOException
+        *             If we run into problems with reading or writing data.
+        */
+       void zipDirectory(RemoteDirectory dir, String base, ZipOutputStream zos)
+                       throws RemoteException, IOException {
+               for (RemoteDirectoryEntry rde : dir.getContents()) {
+                       String name = rde.getName();
+                       if (base != null)
+                               name = base + "/" + name;
+                       if (rde instanceof RemoteDirectory) {
+                               RemoteDirectory rd = (RemoteDirectory) rde;
+                               zipDirectory(rd, name, zos);
+                       } else {
+                               RemoteFile rf = (RemoteFile) rde;
+                               zos.putNextEntry(new ZipEntry(name));
+                               try {
+                                       int off = 0;
+                                       while (true) {
+                                               byte[] c = rf.getContents(off, 
64 * 1024);
+                                               if (c == null || c.length == 0)
+                                                       break;
+                                               zos.write(c);
+                                               off += c.length;
+                                       }
+                               } finally {
+                                       zos.closeEntry();
+                               }
+                       }
+               }
+       }
+}
+
+class FileDelegate extends DEDelegate implements File {
+       RemoteFile rf;
+
+       FileDelegate(RemoteFile f) {
+               super(f);
+               this.rf = f;
+       }
+
+       @Override
+       public byte[] getContents(int offset, int length)
+                       throws FilesystemAccessException {
+               try {
+                       return rf.getContents(offset, length);
+               } catch (IOException e) {
+                       throw new FilesystemAccessException("failed to read 
file contents",
+                                       e);
+               }
+       }
+
+       @Override
+       public long getSize() throws FilesystemAccessException {
+               try {
+                       return rf.getSize();
+               } catch (IOException e) {
+                       throw new FilesystemAccessException("failed to get file 
length", e);
+               }
+       }
+
+       @Override
+       public void setContents(byte[] data) throws FilesystemAccessException {
+               try {
+                       rf.setContents(data);
+               } catch (IOException e) {
+                       throw new FilesystemAccessException(
+                                       "failed to write file contents", e);
+               }
+       }
+
+       @Override
+       public void appendContents(byte[] data) throws 
FilesystemAccessException {
+               try {
+                       rf.appendContents(data);
+               } catch (IOException e) {
+                       throw new FilesystemAccessException(
+                                       "failed to write file contents", e);
+               }
+       }
+
+       @Override
+       public void copy(File from) throws FilesystemAccessException {
+               FileDelegate fromFile;
+               try {
+                       fromFile = (FileDelegate) from;
+               } catch (ClassCastException e) {
+                       throw new FilesystemAccessException("different types of 
File?!");
+               }
+
+               try {
+                       rf.copy(fromFile.rf);
+               } catch (Exception e) {
+                       throw new FilesystemAccessException("failed to copy 
file contents",
+                                       e);
+               }
+               return;
+       }
+}
+
+class ListenerDelegate implements Listener {
+       private Log log = getLog("Taverna.Server.Worker");
+       private RemoteListener r;
+       String conf;
+
+       ListenerDelegate(RemoteListener l) {
+               r = l;
+       }
+
+       RemoteListener getRemote() {
+               return r;
+       }
+
+       @Override
+       public String getConfiguration() {
+               try {
+                       if (conf == null)
+                               conf = r.getConfiguration();
+               } catch (RemoteException e) {
+                       log.warn("failed to get configuration", e);
+               }
+               return conf;
+       }
+
+       @Override
+       public String getName() {
+               try {
+                       return r.getName();
+               } catch (RemoteException e) {
+                       log.warn("failed to get name", e);
+                       return "UNKNOWN NAME";
+               }
+       }
+
+       @Override
+       public String getProperty(String propName) throws NoListenerException {
+               try {
+                       return r.getProperty(propName);
+               } catch (RemoteException e) {
+                       throw new NoListenerException("no such property: " + 
propName, e);
+               }
+       }
+
+       @Override
+       public String getType() {
+               try {
+                       return r.getType();
+               } catch (RemoteException e) {
+                       log.warn("failed to get type", e);
+                       return "UNKNOWN TYPE";
+               }
+       }
+
+       @Override
+       public String[] listProperties() {
+               try {
+                       return r.listProperties();
+               } catch (RemoteException e) {
+                       log.warn("failed to list properties", e);
+                       return new String[0];
+               }
+       }
+
+       @Override
+       public void setProperty(String propName, String value)
+                       throws NoListenerException, BadPropertyValueException {
+               try {
+                       r.setProperty(propName, value);
+               } catch (RemoteException e) {
+                       log.warn("failed to set property", e);
+                       if (e.getCause() != null
+                                       && e.getCause() instanceof 
RuntimeException)
+                               throw new NoListenerException("failed to set 
property",
+                                               e.getCause());
+                       if (e.getCause() != null && e.getCause() instanceof 
Exception)
+                               throw new BadPropertyValueException("failed to 
set property",
+                                               e.getCause());
+                       throw new BadPropertyValueException("failed to set 
property", e);
+               }
+       }
+}
+
+class RunInput implements Input {
+       private final RemoteInput i;
+
+       RunInput(RemoteInput remote) {
+               this.i = remote;
+       }
+
+       @Override
+       public String getFile() {
+               try {
+                       return i.getFile();
+               } catch (RemoteException e) {
+                       return null;
+               }
+       }
+
+       @Override
+       public String getName() {
+               try {
+                       return i.getName();
+               } catch (RemoteException e) {
+                       return null;
+               }
+       }
+
+       @Override
+       public String getValue() {
+               try {
+                       return i.getValue();
+               } catch (RemoteException e) {
+                       return null;
+               }
+       }
+
+       @Override
+       public void setFile(String file) throws FilesystemAccessException,
+                       BadStateChangeException {
+               checkBadFilename(file);
+               try {
+                       i.setFile(file);
+               } catch (RemoteException e) {
+                       throw new FilesystemAccessException("cannot set file 
for input", e);
+               }
+       }
+
+       @Override
+       public void setValue(String value) throws BadStateChangeException {
+               try {
+                       i.setValue(value);
+               } catch (RemoteException e) {
+                       throw new BadStateChangeException(e);
+               }
+       }
+
+       @Override
+       public String getDelimiter() {
+               try {
+                       return i.getDelimiter();
+               } catch (RemoteException e) {
+                       return null;
+               }
+       }
+
+       @Override
+       public void setDelimiter(String delimiter) throws 
BadStateChangeException {
+               try {
+                       if (delimiter != null)
+                               delimiter = delimiter.substring(0, 1);
+                       i.setDelimiter(delimiter);
+               } catch (RemoteException e) {
+                       throw new BadStateChangeException(e);
+               }
+       }
+}
+
+@SuppressWarnings("serial")
+class SecurityContextReconstructionException extends RuntimeException {
+       public SecurityContextReconstructionException(Throwable t) {
+               super("failed to rebuild security context", t);
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunConnection.java
----------------------------------------------------------------------
diff --git 
a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunConnection.java
 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunConnection.java
new file mode 100644
index 0000000..0c2b1a9
--- /dev/null
+++ 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunConnection.java
@@ -0,0 +1,252 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static org.taverna.server.master.worker.RunConnection.COUNT_QUERY;
+import static org.taverna.server.master.worker.RunConnection.NAMES_QUERY;
+import static org.taverna.server.master.worker.RunConnection.SCHEMA;
+import static org.taverna.server.master.worker.RunConnection.TABLE;
+import static org.taverna.server.master.worker.RunConnection.TIMEOUT_QUERY;
+import static 
org.taverna.server.master.worker.RunConnection.UNTERMINATED_QUERY;
+
+import java.io.IOException;
+import java.rmi.MarshalledObject;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+
+import javax.annotation.Nonnull;
+import javax.jdo.annotations.Column;
+import javax.jdo.annotations.Join;
+import javax.jdo.annotations.PersistenceCapable;
+import javax.jdo.annotations.Persistent;
+import javax.jdo.annotations.PrimaryKey;
+import javax.jdo.annotations.Queries;
+import javax.jdo.annotations.Query;
+
+import org.taverna.server.localworker.remote.RemoteSingleRun;
+import org.taverna.server.master.common.Credential;
+import org.taverna.server.master.common.Trust;
+import org.taverna.server.master.common.Workflow;
+import org.taverna.server.master.interfaces.SecurityContextFactory;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * The representation of the connections to the runs that actually participates
+ * in the persistence system.
+ * 
+ * @author Donal Fellows
+ */
+@PersistenceCapable(table = TABLE, schema = SCHEMA)
+@Queries({
+               @Query(name = "count", language = "SQL", value = COUNT_QUERY, 
unique = "true", resultClass = Integer.class),
+               @Query(name = "names", language = "SQL", value = NAMES_QUERY, 
unique = "false", resultClass = String.class),
+               @Query(name = "unterminated", language = "SQL", value = 
UNTERMINATED_QUERY, unique = "false", resultClass = String.class),
+               @Query(name = "timedout", language = "SQL", value = 
TIMEOUT_QUERY, unique = "false", resultClass = String.class) })
+public class RunConnection {
+       static final String SCHEMA = "TAVERNA";
+       static final String TABLE = "RUN_CONNECTION";
+       private static final String FULL_NAME = SCHEMA + "." + TABLE;
+       static final String COUNT_QUERY = "SELECT count(*) FROM " + FULL_NAME;
+       static final String NAMES_QUERY = "SELECT ID FROM " + FULL_NAME;
+       static final String TIMEOUT_QUERY = "SELECT ID FROM " + FULL_NAME
+                       + "   WHERE expiry < CURRENT_TIMESTAMP";
+       static final String UNTERMINATED_QUERY = "SELECT ID FROM " + FULL_NAME
+                       + "   WHERE doneTransitionToFinished = 0";
+       static final int NAME_LENGTH = 48; 
+
+       @PrimaryKey
+       @Column(length = 40)
+       private String id;
+
+       @Persistent(defaultFetchGroup = "true")
+       @Column(length = NAME_LENGTH)
+       private String name;
+
+       @Persistent(defaultFetchGroup = "true")
+       private Date creationInstant;
+
+       @Persistent(defaultFetchGroup = "true", serialized = "true")
+       @Column(jdbcType = "BLOB", sqlType = "BLOB")
+       private Workflow workflow;
+
+       @Persistent(defaultFetchGroup = "true")
+       private Date expiry;
+
+       @Persistent(defaultFetchGroup = "true")
+       @Join(table = TABLE + "_READERS", column = "ID")
+       private String[] readers;
+
+       @Persistent(defaultFetchGroup = "true")
+       @Join(table = TABLE + "_WRITERS", column = "ID")
+       private String[] writers;
+
+       @Persistent(defaultFetchGroup = "true")
+       @Join(table = TABLE + "_DESTROYERS", column = "ID")
+       private String[] destroyers;
+
+       @Persistent(defaultFetchGroup = "true", serialized = "true")
+       @Column(jdbcType = "BLOB", sqlType = "BLOB")
+       private MarshalledObject<RemoteSingleRun> run;
+
+       @Persistent(defaultFetchGroup = "true")
+       private int doneTransitionToFinished;
+
+       @Persistent(defaultFetchGroup = "true")
+       private int generateProvenance;
+
+       @Persistent(defaultFetchGroup = "true")
+       @Column(length = 128)
+       String owner;
+
+       @Persistent(defaultFetchGroup = "true")
+       @Column(length = 36)
+       private String securityToken;
+
+       @Persistent(defaultFetchGroup = "true", serialized = "true")
+       @Column(jdbcType = "BLOB", sqlType = "BLOB")
+       private SecurityContextFactory securityContextFactory;
+       @Persistent(defaultFetchGroup = "true", serialized = "true")
+       @Column(jdbcType = "BLOB", sqlType = "BLOB")
+       private Credential[] credentials;
+       @Persistent(defaultFetchGroup = "true", serialized = "true")
+       @Column(jdbcType = "BLOB", sqlType = "BLOB")
+       private Trust[] trust;
+
+       private static final String[] STRING_ARY = new String[0];
+
+       public String getId() {
+               return id;
+       }
+
+       public boolean isFinished() {
+               return doneTransitionToFinished != 0;
+       }
+
+       public void setFinished(boolean finished) {
+               doneTransitionToFinished = (finished ? 1 : 0);
+       }
+
+       public boolean isProvenanceGenerated() {
+               return generateProvenance != 0;
+       }
+
+       public void setProvenanceGenerated(boolean generate) {
+               generateProvenance = (generate ? 1 : 0);
+       }
+
+       /**
+        * Manufacture a persistent representation of the given workflow run. 
Must
+        * be called within the context of a transaction.
+        * 
+        * @param rrd
+        *            The remote delegate of the workflow run.
+        * @return The persistent object.
+        * @throws IOException
+        *             If serialisation fails.
+        */
+       @Nonnull
+       public static RunConnection toDBform(@Nonnull RemoteRunDelegate rrd)
+                       throws IOException {
+               RunConnection rc = new RunConnection();
+               rc.id = rrd.id;
+               rc.makeChanges(rrd);
+               return rc;
+       }
+
+       private static List<String> list(String[] ary) {
+               if (ary == null)
+                       return emptyList();
+               return asList(ary);
+       }
+
+       /**
+        * Get the remote run delegate for a particular persistent connection. 
Must
+        * be called within the context of a transaction.
+        * 
+        * @param db
+        *            The database facade.
+        * @return The delegate object.
+        * @throws Exception
+        *             If anything goes wrong.
+        */
+       @Nonnull
+       public RemoteRunDelegate fromDBform(@Nonnull RunDBSupport db)
+                       throws Exception {
+               RemoteRunDelegate rrd = new RemoteRunDelegate();
+               rrd.id = getId();
+               rrd.creationInstant = creationInstant;
+               rrd.workflow = workflow;
+               rrd.expiry = expiry;
+               rrd.readers = new HashSet<>(list(readers));
+               rrd.writers = new HashSet<>(list(writers));
+               rrd.destroyers = new HashSet<>(list(destroyers));
+               rrd.run = run.get();
+               rrd.doneTransitionToFinished = isFinished();
+               rrd.generateProvenance = isProvenanceGenerated();
+               rrd.secContext = securityContextFactory.create(rrd,
+                               new UsernamePrincipal(owner));
+               
((SecurityContextDelegate)rrd.secContext).setCredentialsAndTrust(credentials,trust);
+               rrd.db = db;
+               rrd.factory = db.getFactory();
+               rrd.name = name;
+               return rrd;
+       }
+
+       /**
+        * Flush changes from a remote run delegate to the database. Must be 
called
+        * within the context of a transaction.
+        * 
+        * @param rrd
+        *            The remote run delegate object that has potential changes.
+        * @throws IOException
+        *             If anything goes wrong in serialization.
+        */
+       public void makeChanges(@Nonnull RemoteRunDelegate rrd) throws 
IOException {
+               // Properties that are set exactly once
+               if (creationInstant == null) {
+                       creationInstant = rrd.getCreationTimestamp();
+                       workflow = rrd.getWorkflow();
+                       run = new MarshalledObject<>(rrd.run);
+                       securityContextFactory = 
rrd.getSecurityContext().getFactory();
+                       owner = rrd.getSecurityContext().getOwner().getName();
+                       securityToken = 
((org.taverna.server.master.worker.SecurityContextFactory) 
securityContextFactory)
+                                       .issueNewPassword();
+               }
+               // Properties that are set multiple times
+               expiry = rrd.getExpiry();
+               readers = rrd.getReaders().toArray(STRING_ARY);
+               writers = rrd.getWriters().toArray(STRING_ARY);
+               destroyers = rrd.getDestroyers().toArray(STRING_ARY);
+               credentials = rrd.getSecurityContext().getCredentials();
+               trust = rrd.getSecurityContext().getTrusted();
+               if (rrd.name.length() > NAME_LENGTH)
+                       this.name = rrd.name.substring(0, NAME_LENGTH);
+               else
+                       this.name = rrd.name;
+               setFinished(rrd.doneTransitionToFinished);
+               setProvenanceGenerated(rrd.generateProvenance);
+       }
+
+       public String getSecurityToken() {
+               return securityToken;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDBSupport.java
----------------------------------------------------------------------
diff --git 
a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDBSupport.java
 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDBSupport.java
new file mode 100644
index 0000000..5fa96b8
--- /dev/null
+++ 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDBSupport.java
@@ -0,0 +1,96 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.List;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.taverna.server.master.notification.NotificationEngine;
+
+/**
+ * The interface to the database of runs.
+ * 
+ * @author Donal Fellows
+ */
+public interface RunDBSupport {
+       /**
+        * Scan each run to see if it has finished yet and issue registered
+        * notifications if it has.
+        */
+       void checkForFinishNow();
+
+       /**
+        * Remove currently-expired runs from this database.
+        */
+       void cleanNow();
+
+       /**
+        * How many runs are stored in the database.
+        * 
+        * @return The current size of the run table.
+        */
+       int countRuns();
+
+       /**
+        * Ensure that a run gets persisted in the database. It is assumed that 
the
+        * value is already in there.
+        * 
+        * @param run
+        *            The run to persist.
+        */
+       void flushToDisk(@Nonnull RemoteRunDelegate run);
+
+       /**
+        * Select an arbitrary representative run.
+        * 
+        * @return The selected run.
+        * @throws Exception
+        *             If anything goes wrong.
+        */
+       @Nullable
+       RemoteRunDelegate pickArbitraryRun() throws Exception;
+
+       /**
+        * Get a list of all the run names.
+        * 
+        * @return The names (i.e., UUIDs) of all the runs.
+        */
+       @Nonnull
+       List<String> listRunNames();
+
+       /**
+        * @param notificationEngine
+        *            A reference to the notification fabric bean.
+        */
+       void setNotificationEngine(NotificationEngine notificationEngine);
+
+       /**
+        * @param notifier
+        *            A reference to the bean that creates messages about 
workflow
+        *            run termination.
+        */
+       void setNotifier(CompletionNotifier notifier);
+
+       /**
+        * @return A reference to the actual factory for remote runs.
+        */
+       FactoryBean getFactory();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabase.java
----------------------------------------------------------------------
diff --git 
a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabase.java
 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabase.java
new file mode 100644
index 0000000..65aec70
--- /dev/null
+++ 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabase.java
@@ -0,0 +1,324 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.lang.Integer.parseInt;
+import static java.util.UUID.randomUUID;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Required;
+import org.taverna.server.master.common.Status;
+import org.taverna.server.master.exceptions.UnknownRunException;
+import org.taverna.server.master.interfaces.Listener;
+import org.taverna.server.master.interfaces.Policy;
+import org.taverna.server.master.interfaces.RunStore;
+import org.taverna.server.master.interfaces.TavernaRun;
+import org.taverna.server.master.notification.NotificationEngine;
+import org.taverna.server.master.notification.NotificationEngine.Message;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * The main facade bean that interfaces to the database of runs.
+ * 
+ * @author Donal Fellows
+ */
+public class RunDatabase implements RunStore, RunDBSupport {
+       private Log log = LogFactory.getLog("Taverna.Server.Worker.RunDB");
+       RunDatabaseDAO dao;
+       CompletionNotifier backupNotifier;
+       Map<String, CompletionNotifier> typedNotifiers;
+       private NotificationEngine notificationEngine;
+       @Autowired
+       private FactoryBean factory;
+       private Map<String, TavernaRun> cache = new HashMap<>();
+
+       @Override
+       @Required
+       public void setNotifier(CompletionNotifier n) {
+               backupNotifier = n;
+       }
+
+       public void setTypeNotifiers(List<CompletionNotifier> notifiers) {
+               typedNotifiers = new HashMap<>();
+               for (CompletionNotifier n : notifiers)
+                       typedNotifiers.put(n.getName(), n);
+       }
+
+       @Required
+       @Override
+       public void setNotificationEngine(NotificationEngine 
notificationEngine) {
+               this.notificationEngine = notificationEngine;
+       }
+
+       @Required
+       public void setDao(RunDatabaseDAO dao) {
+               this.dao = dao;
+       }
+
+       @Override
+       public void checkForFinishNow() {
+               /*
+                * Get which runs are actually newly finished; this requires 
getting the
+                * candidates from the database and *then* doing the expensive 
requests
+                * to the back end to find out the status.
+                */
+               Map<String, RemoteRunDelegate> notifiable = new HashMap<>();
+               for (RemoteRunDelegate p : dao.getPotentiallyNotifiable())
+                       if (p.getStatus() == Status.Finished)
+                               notifiable.put(p.getId(), p);
+
+               // Check if there's nothing more to do
+               if (notifiable.isEmpty())
+                       return;
+
+               /*
+                * Tell the database about the ones we've got.
+                */
+               dao.markFinished(notifiable.keySet());
+
+               /*
+                * Send out the notifications. The notification addresses are 
stored in
+                * the back-end engine, so this is *another* thing that can 
take time.
+                */
+               for (RemoteRunDelegate rrd : notifiable.values())
+                       for (Listener l : rrd.getListeners())
+                               if (l.getName().equals("io")) {
+                                       try {
+                                               notifyFinished(rrd.id, l, rrd);
+                                       } catch (Exception e) {
+                                               log.warn("failed to do 
notification of completion", e);
+                                       }
+                                       break;
+                               }
+       }
+
+       @Override
+       public void cleanNow() {
+               List<String> cleaned;
+               try {
+                       cleaned = dao.doClean();
+               } catch (Exception e) {
+                       log.warn("failure during deletion of expired runs", e);
+                       return;
+               }
+               synchronized (cache) {
+                       for (String id : cleaned)
+                               cache.remove(id);
+               }
+       }
+
+       @Override
+       public int countRuns() {
+               return dao.countRuns();
+       }
+
+       @Override
+       public void flushToDisk(RemoteRunDelegate run) {
+               try {
+                       dao.flushToDisk(run);
+               } catch (IOException e) {
+                       throw new RuntimeException(
+                                       "unexpected problem when persisting run 
record in database",
+                                       e);
+               }
+       }
+
+       @Override
+       public RemoteRunDelegate pickArbitraryRun() throws Exception {
+               return dao.pickArbitraryRun();
+       }
+
+       @Override
+       public List<String> listRunNames() {
+               return dao.listRunNames();
+       }
+
+       @Nullable
+       private TavernaRun get(String uuid) {
+               TavernaRun run = null;
+               synchronized (cache) {
+                       run = cache.get(uuid);
+               }
+               try {
+                       if (run != null)
+                               run.ping();
+               } catch (UnknownRunException e) {
+                       if (log.isDebugEnabled())
+                               log.debug("stale mapping in cache?", e);
+                       // Don't need to flush the cache; this happens when 
cleaning anyway
+                       run = null;
+               }
+               if (run == null)
+                       run = dao.get(uuid);
+               return run;
+       }
+
+       @Override
+       public TavernaRun getRun(UsernamePrincipal user, Policy p, String uuid)
+                       throws UnknownRunException {
+               // Check first to see if the 'uuid' actually looks like a UUID; 
if
+               // not, throw it out immediately without logging an exception.
+               try {
+                       UUID.fromString(uuid);
+               } catch (IllegalArgumentException e) {
+                       if (log.isDebugEnabled())
+                               log.debug("run ID does not look like UUID; 
rejecting...");
+                       throw new UnknownRunException();
+               }
+               TavernaRun run = get(uuid);
+               if (run != null && (user == null || p.permitAccess(user, run)))
+                       return run;
+               throw new UnknownRunException();
+       }
+
+       @Override
+       public TavernaRun getRun(String uuid) throws UnknownRunException {
+               TavernaRun run = get(uuid);
+               if (run != null)
+                       return run;
+               throw new UnknownRunException();
+       }
+
+       @Override
+       public Map<String, TavernaRun> listRuns(UsernamePrincipal user, Policy 
p) {
+               synchronized (cache) {
+                       Map<String, TavernaRun> cached = new HashMap<>();
+                       for (Entry<String, TavernaRun> e : cache.entrySet()) {
+                               TavernaRun r = e.getValue();
+                               if (p.permitAccess(user, r))
+                                       cached.put(e.getKey(), r);
+                       }
+                       if (!cached.isEmpty())
+                               return cached;
+               }
+               return dao.listRuns(user, p);
+       }
+
+       private void logLength(String message, Object obj) {
+               if (!log.isDebugEnabled())
+                       return;
+               try {
+                       ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                       try (ObjectOutputStream oos = new 
ObjectOutputStream(baos)) {
+                               oos.writeObject(obj);
+                       }
+                       log.debug(message + ": " + baos.size());
+               } catch (Exception e) {
+                       log.warn("oops", e);
+               }
+       }
+
+       @Override
+       public String registerRun(TavernaRun run) {
+               if (!(run instanceof RemoteRunDelegate))
+                       throw new IllegalArgumentException(
+                                       "run must be created by localworker 
package");
+               RemoteRunDelegate rrd = (RemoteRunDelegate) run;
+               if (rrd.id == null)
+                       rrd.id = randomUUID().toString();
+               logLength("RemoteRunDelegate serialized length", rrd);
+               try {
+                       dao.persistRun(rrd);
+               } catch (IOException e) {
+                       throw new RuntimeException(
+                                       "unexpected problem when persisting run 
record in database",
+                                       e);
+               }
+               synchronized (cache) {
+                       cache.put(rrd.getId(), run);
+               }
+               return rrd.getId();
+       }
+
+       @Override
+       public void unregisterRun(String uuid) {
+               try {
+                       if (dao.unpersistRun(uuid))
+                               synchronized (cache) {
+                                       cache.remove(uuid);
+                               }
+               } catch (RuntimeException e) {
+                       if (log.isDebugEnabled())
+                               log.debug("problem persisting the deletion of 
the run " + uuid,
+                                               e);
+               }
+       }
+
+       /**
+        * Process the event that a run has finished.
+        * 
+        * @param name
+        *            The name of the run.
+        * @param io
+        *            The io listener of the run (used to get information about 
the
+        *            run).
+        * @param run
+        *            The handle to the run.
+        * @throws Exception
+        *             If anything goes wrong.
+        */
+       private void notifyFinished(final String name, Listener io,
+                       final RemoteRunDelegate run) throws Exception {
+               String to = io.getProperty("notificationAddress");
+               final int code;
+               try {
+                       code = parseInt(io.getProperty("exitcode"));
+               } catch (NumberFormatException nfe) {
+                       // Ignore; not much we can do here...
+                       return;
+               }
+
+               notificationEngine.dispatchMessage(run, to, new Message() {
+                       private CompletionNotifier getNotifier(String type) {
+                               CompletionNotifier n = typedNotifiers.get(type);
+                               if (n == null)
+                                       n = backupNotifier;
+                               return n;
+                       }
+
+                       @Override
+                       public String getContent(String type) {
+                               return 
getNotifier(type).makeCompletionMessage(name, run, code);
+                       }
+
+                       @Override
+                       public String getTitle(String type) {
+                               return 
getNotifier(type).makeMessageSubject(name, run, code);
+                       }
+               });
+       }
+
+       @Override
+       public FactoryBean getFactory() {
+               return factory;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabaseDAO.java
----------------------------------------------------------------------
diff --git 
a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabaseDAO.java
 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabaseDAO.java
new file mode 100644
index 0000000..1c75d22
--- /dev/null
+++ 
b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabaseDAO.java
@@ -0,0 +1,323 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.taverna.server.master.worker.RunConnection.toDBform;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.jdo.annotations.PersistenceAware;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.annotation.Required;
+import org.taverna.server.master.interfaces.Policy;
+import org.taverna.server.master.interfaces.TavernaRun;
+import org.taverna.server.master.utils.CallTimeLogger.PerfLogged;
+import org.taverna.server.master.utils.JDOSupport;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * This handles storing runs, interfacing with the underlying state engine as
+ * necessary.
+ * 
+ * @author Donal Fellows
+ */
+@PersistenceAware
+public class RunDatabaseDAO extends JDOSupport<RunConnection> {
+       public RunDatabaseDAO() {
+               super(RunConnection.class);
+       }
+
+       private Log log = LogFactory.getLog("Taverna.Server.Worker.RunDB");
+       private RunDatabase facade;
+
+       @Required
+       public void setFacade(RunDatabase facade) {
+               this.facade = facade;
+       }
+
+       // -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+
+       @SuppressWarnings("unchecked")
+       private List<String> names() {
+               if (log.isDebugEnabled())
+                       log.debug("fetching all run names");
+               return (List<String>) namedQuery("names").execute();
+       }
+
+       /**
+        * @return The number of workflow runs in the database.
+        */
+       @WithinSingleTransaction
+       public int countRuns() {
+               if (log.isDebugEnabled())
+                       log.debug("counting the number of runs");
+               return count();
+       }
+
+       private Integer count() {
+               return (Integer) namedQuery("count").execute();
+       }
+
+       @SuppressWarnings("unchecked")
+       private List<String> timedout() {
+               return (List<String>) namedQuery("timedout").execute();
+       }
+
+       @SuppressWarnings("unchecked")
+       private List<String> unterminated() {
+               return (List<String>) namedQuery("unterminated").execute();
+       }
+
+       @Nullable
+       private RunConnection pickRun(@Nonnull String name) {
+               if (log.isDebugEnabled())
+                       log.debug("fetching the run called " + name);
+               try {
+                       RunConnection rc = getById(name);
+                       if (rc == null)
+                               log.warn("no result for " + name);
+                       return rc;
+               } catch (RuntimeException e) {
+                       log.warn("problem in fetch", e);
+                       throw e;
+               }
+       }
+
+       @Nullable
+       @WithinSingleTransaction
+       public String getSecurityToken(@Nonnull String name) {
+               RunConnection rc = getById(name);
+               if (rc == null)
+                       return null;
+               return rc.getSecurityToken();
+       }
+
+       private void persist(@Nonnull RemoteRunDelegate rrd) throws IOException 
{
+               persist(toDBform(rrd));
+       }
+
+       @Nonnull
+       private List<RunConnection> allRuns() {
+               try {
+                       List<RunConnection> rcs = new ArrayList<>();
+                       List<String> names = names();
+                       for (String id : names) {
+                               try {
+                                       if (id != null)
+                                               rcs.add(pickRun(id));
+                               } catch (RuntimeException e) {
+                                       continue;
+                               }
+                       }
+                       return rcs;
+               } catch (RuntimeException e) {
+                       log.warn("problem in fetch", e);
+                       throw e;
+               }
+       }
+
+       // -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+
+       /**
+        * Obtain a workflow run handle.
+        * 
+        * @param name
+        *            The identifier of the run.
+        * @return The run handle, or <tt>null</tt> if there is no such run.
+        */
+       @Nullable
+       @WithinSingleTransaction
+       public TavernaRun get(String name) {
+               try {
+                       RunConnection rc = pickRun(name);
+                       return (rc == null) ? null : rc.fromDBform(facade);
+               } catch (Exception e) {
+                       return null;
+               }
+       }
+
+       /**
+        * Get the runs that a user can read things from.
+        * 
+        * @param user
+        *            Who is asking?
+        * @param p
+        *            The policy that determines what they can see.
+        * @return A mapping from run IDs to run handles.
+        */
+       @Nonnull
+       @WithinSingleTransaction
+       public Map<String, TavernaRun> listRuns(UsernamePrincipal user, Policy 
p) {
+               Map<String, TavernaRun> result = new HashMap<>();
+               for (String id : names())
+                       try {
+                               RemoteRunDelegate rrd = 
pickRun(id).fromDBform(facade);
+                               if (p.permitAccess(user, rrd))
+                                       result.put(id, rrd);
+                       } catch (Exception e) {
+                               continue;
+                       }
+               return result;
+       }
+
+       /**
+        * @return A list of the IDs for all workflow runs.
+        */
+       @Nonnull
+       @WithinSingleTransaction
+       public List<String> listRunNames() {
+               List<String> runNames = new ArrayList<>();
+               for (RunConnection rc : allRuns())
+                       if (rc.getId() != null)
+                               runNames.add(rc.getId());
+               return runNames;
+       }
+
+       /**
+        * @return An arbitrary, representative workflow run.
+        * @throws Exception
+        *             If anything goes wrong.
+        */
+       @Nullable
+       @WithinSingleTransaction
+       public RemoteRunDelegate pickArbitraryRun() throws Exception {
+               for (RunConnection rc : allRuns()) {
+                       if (rc.getId() == null)
+                               continue;
+                       return rc.fromDBform(facade);
+               }
+               return null;
+       }
+
+       /**
+        * Make a workflow run persistent. Must only be called once per workflow
+        * run.
+        * 
+        * @param rrd
+        *            The workflow run to persist.
+        * @throws IOException
+        *             If anything goes wrong with serialisation of the run.
+        */
+       @WithinSingleTransaction
+       public void persistRun(@Nonnull RemoteRunDelegate rrd) throws 
IOException {
+               persist(rrd);
+       }
+
+       /**
+        * Stop a workflow run from being persistent.
+        * 
+        * @param name
+        *            The ID of the run.
+        * @return Whether a deletion happened.
+        */
+       @WithinSingleTransaction
+       public boolean unpersistRun(String name) {
+               RunConnection rc = pickRun(name);
+               if (rc != null)
+                       delete(rc);
+               return rc != null;
+       }
+
+       /**
+        * Ensure that the given workflow run is synchronized with the database.
+        * 
+        * @param run
+        *            The run to synchronise.
+        * @throws IOException
+        *             If serialization of anything fails.
+        */
+       @WithinSingleTransaction
+       public void flushToDisk(@Nonnull RemoteRunDelegate run) throws 
IOException {
+               getById(run.id).makeChanges(run);
+       }
+
+       /**
+        * Remove all workflow runs that have expired.
+        * 
+        * @return The ids of the deleted runs.
+        */
+       @Nonnull
+       @PerfLogged
+       @WithinSingleTransaction
+       public List<String> doClean() {
+               if (log.isDebugEnabled())
+                       log.debug("deleting runs that timed out before " + new 
Date());
+               List<String> toDelete = timedout();
+               if (log.isDebugEnabled())
+                       log.debug("found " + toDelete.size() + " runs to 
delete");
+               for (String id : toDelete) {
+                       RunConnection rc = getById(id);
+                       try {
+                               rc.fromDBform(facade).run.destroy();
+                       } catch (Exception e) {
+                               if (log.isDebugEnabled())
+                                       log.debug("failed to delete execution 
resource for " + id,
+                                                       e);
+                       }
+                       delete(rc);
+               }
+               return toDelete;
+       }
+
+       /**
+        * @return A list of workflow runs that are candidates for doing
+        *         notification of termination.
+        */
+       @Nonnull
+       @PerfLogged
+       @WithinSingleTransaction
+       public List<RemoteRunDelegate> getPotentiallyNotifiable() {
+               List<RemoteRunDelegate> toNotify = new ArrayList<>();
+               for (String id : unterminated())
+                       try {
+                               RunConnection rc = getById(id);
+                               toNotify.add(rc.fromDBform(facade));
+                       } catch (Exception e) {
+                               log.warn("failed to fetch connection token"
+                                               + "for notification of 
completion check", e);
+                       }
+               return toNotify;
+       }
+
+       @PerfLogged
+       @WithinSingleTransaction
+       public void markFinished(@Nonnull Set<String> terminated) {
+               for (String id : terminated) {
+                       RunConnection rc = getById(id);
+                       if (rc == null)
+                               continue;
+                       try {
+                               rc.fromDBform(facade).doneTransitionToFinished 
= true;
+                               rc.setFinished(true);
+                       } catch (Exception e) {
+                               log.warn("failed to note termination", e);
+                       }
+               }
+       }
+}

Reply via email to