http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RemoteRunDelegate.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RemoteRunDelegate.java b/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RemoteRunDelegate.java new file mode 100644 index 0000000..22158d5 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RemoteRunDelegate.java @@ -0,0 +1,967 @@ +/* + * Copyright (C) 2010-2013 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.master.worker; + +import static java.lang.System.currentTimeMillis; +import static java.util.Calendar.MINUTE; +import static java.util.Collections.sort; +import static java.util.Collections.unmodifiableSet; +import static java.util.UUID.randomUUID; +import static org.apache.commons.io.IOUtils.closeQuietly; +import static org.apache.commons.logging.LogFactory.getLog; +import static org.taverna.server.master.worker.RemoteRunDelegate.checkBadFilename; +import static org.taverna.server.master.worker.RunConnection.NAME_LENGTH; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.PipedOutputStream; +import java.rmi.MarshalledObject; +import java.rmi.RemoteException; +import java.security.GeneralSecurityException; +import java.security.Principal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Collection; +import java.util.Comparator; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import javax.annotation.Nonnull; + +import org.apache.commons.logging.Log; +import org.taverna.server.localworker.remote.IllegalStateTransitionException; +import org.taverna.server.localworker.remote.ImplementationException; +import org.taverna.server.localworker.remote.RemoteDirectory; +import org.taverna.server.localworker.remote.RemoteDirectoryEntry; +import org.taverna.server.localworker.remote.RemoteFile; +import org.taverna.server.localworker.remote.RemoteInput; +import org.taverna.server.localworker.remote.RemoteListener; +import org.taverna.server.localworker.remote.RemoteSingleRun; +import org.taverna.server.localworker.remote.RemoteStatus; +import org.taverna.server.localworker.remote.StillWorkingOnItException; +import org.taverna.server.master.common.Status; +import org.taverna.server.master.common.Workflow; +import org.taverna.server.master.exceptions.BadPropertyValueException; +import org.taverna.server.master.exceptions.BadStateChangeException; +import org.taverna.server.master.exceptions.FilesystemAccessException; +import org.taverna.server.master.exceptions.NoListenerException; +import org.taverna.server.master.exceptions.OverloadedException; +import org.taverna.server.master.exceptions.UnknownRunException; +import org.taverna.server.master.interfaces.Directory; +import org.taverna.server.master.interfaces.DirectoryEntry; +import org.taverna.server.master.interfaces.File; +import org.taverna.server.master.interfaces.Input; +import org.taverna.server.master.interfaces.Listener; +import org.taverna.server.master.interfaces.SecurityContextFactory; +import org.taverna.server.master.interfaces.TavernaRun; +import org.taverna.server.master.interfaces.TavernaSecurityContext; +import org.taverna.server.master.utils.UsernamePrincipal; + +/** + * Bridging shim between the WebApp world and the RMI world. + * + * @author Donal Fellows + */ +@SuppressWarnings("serial") +public class RemoteRunDelegate implements TavernaRun { + private transient Log log = getLog("Taverna.Server.Worker"); + transient TavernaSecurityContext secContext; + Date creationInstant; + Workflow workflow; + Date expiry; + HashSet<String> readers; + HashSet<String> writers; + HashSet<String> destroyers; + transient String id; + transient RemoteSingleRun run; + transient RunDBSupport db; + transient FactoryBean factory; + boolean doneTransitionToFinished; + boolean generateProvenance;// FIXME expose + String name; + private static final String ELLIPSIS = "..."; + + public RemoteRunDelegate(Date creationInstant, Workflow workflow, + RemoteSingleRun rsr, int defaultLifetime, RunDBSupport db, UUID id, + boolean generateProvenance, FactoryBean factory) { + if (rsr == null) + throw new IllegalArgumentException("remote run must not be null"); + this.creationInstant = creationInstant; + this.workflow = workflow; + Calendar c = Calendar.getInstance(); + c.add(MINUTE, defaultLifetime); + this.expiry = c.getTime(); + this.run = rsr; + this.db = db; + this.generateProvenance = generateProvenance; + this.factory = factory; + try { + this.name = ""; + String ci = " " + creationInstant; + String n = workflow.getName(); + if (n.length() > NAME_LENGTH - ci.length()) + n = n.substring(0, + NAME_LENGTH - ci.length() - ELLIPSIS.length()) + + ELLIPSIS; + this.name = n + ci; + } catch (Exception e) { + // Ignore; it's just a name, not something important. + } + if (id != null) + this.id = id.toString(); + } + + RemoteRunDelegate() { + } + + /** + * Get the types of listener supported by this run. + * + * @return A list of listener type names. + * @throws RemoteException + * If anything goes wrong. + */ + public List<String> getListenerTypes() throws RemoteException { + return run.getListenerTypes(); + } + + @Override + public void addListener(Listener listener) { + if (listener instanceof ListenerDelegate) + try { + run.addListener(((ListenerDelegate) listener).getRemote()); + } catch (RemoteException e) { + log.warn("communication problem adding listener", e); + } catch (ImplementationException e) { + log.warn("implementation problem adding listener", e); + } + else + log.fatal("bad listener " + listener.getClass() + + "; not applicable remotely!"); + } + + @Override + public String getId() { + if (id == null) + id = randomUUID().toString(); + return id; + } + + /** + * Attach a listener to a workflow run and return its local delegate. + * + * @param type + * The type of listener to create. + * @param config + * The configuration of the listener. + * @return The local delegate of the listener. + * @throws NoListenerException + * If anything goes wrong. + */ + public Listener makeListener(String type, String config) + throws NoListenerException { + try { + return new ListenerDelegate(run.makeListener(type, config)); + } catch (RemoteException e) { + throw new NoListenerException("failed to make listener", e); + } + } + + @Override + public void destroy() { + try { + run.destroy(); + } catch (RemoteException | ImplementationException e) { + log.warn("failed to destroy run", e); + } + } + + @Override + public Date getExpiry() { + return new Date(expiry.getTime()); + } + + @Override + public List<Listener> getListeners() { + List<Listener> listeners = new ArrayList<>(); + try { + for (RemoteListener rl : run.getListeners()) + listeners.add(new ListenerDelegate(rl)); + } catch (RemoteException e) { + log.warn("failed to get listeners", e); + } + return listeners; + } + + @Override + public TavernaSecurityContext getSecurityContext() { + return secContext; + } + + @Override + public Status getStatus() { + try { + switch (run.getStatus()) { + case Initialized: + return Status.Initialized; + case Operating: + return Status.Operating; + case Stopped: + return Status.Stopped; + case Finished: + return Status.Finished; + } + } catch (RemoteException e) { + log.warn("problem getting remote status", e); + } + return Status.Finished; + } + + @Override + public Workflow getWorkflow() { + return workflow; + } + + @Override + public Directory getWorkingDirectory() throws FilesystemAccessException { + try { + return new DirectoryDelegate(run.getWorkingDirectory()); + } catch (Throwable e) { + if (e.getCause() != null) + e = e.getCause(); + throw new FilesystemAccessException( + "problem getting main working directory handle", e); + } + } + + @Override + public void setExpiry(Date d) { + if (d.after(new Date())) + expiry = new Date(d.getTime()); + db.flushToDisk(this); + } + + @Override + public String setStatus(Status s) throws BadStateChangeException { + try { + log.info("setting status of run " + id + " to " + s); + switch (s) { + case Initialized: + run.setStatus(RemoteStatus.Initialized); + break; + case Operating: + if (run.getStatus() == RemoteStatus.Initialized) { + if (!factory.isAllowingRunsToStart()) + throw new OverloadedException(); + secContext.conveySecurity(); + } + run.setGenerateProvenance(generateProvenance); + run.setStatus(RemoteStatus.Operating); + factory.getMasterEventFeed() + .started( + this, + "started run execution", + "The execution of run '" + getName() + + "' has started."); + break; + case Stopped: + run.setStatus(RemoteStatus.Stopped); + break; + case Finished: + run.setStatus(RemoteStatus.Finished); + break; + } + return null; + } catch (IllegalStateTransitionException e) { + throw new BadStateChangeException(e.getMessage()); + } catch (RemoteException e) { + throw new BadStateChangeException(e.getMessage(), e.getCause()); + } catch (GeneralSecurityException | IOException e) { + throw new BadStateChangeException(e.getMessage(), e); + } catch (ImplementationException e) { + if (e.getCause() != null) + throw new BadStateChangeException(e.getMessage(), e.getCause()); + throw new BadStateChangeException(e.getMessage(), e); + } catch (StillWorkingOnItException e) { + log.info("still working on setting status of run " + id + " to " + + s, e); + return e.getMessage(); + } catch (InterruptedException e) { + throw new BadStateChangeException( + "interrupted while waiting to insert notification into database"); + } + } + + static void checkBadFilename(String filename) + throws FilesystemAccessException { + if (filename.startsWith("/")) + throw new FilesystemAccessException("filename may not be absolute"); + if (Arrays.asList(filename.split("/")).contains("..")) + throw new FilesystemAccessException( + "filename may not refer to parent"); + } + + @Override + public String getInputBaclavaFile() { + try { + return run.getInputBaclavaFile(); + } catch (RemoteException e) { + log.warn("problem when fetching input baclava file", e); + return null; + } + } + + @Override + public List<Input> getInputs() { + ArrayList<Input> inputs = new ArrayList<>(); + try { + for (RemoteInput ri : run.getInputs()) + inputs.add(new RunInput(ri)); + } catch (RemoteException e) { + log.warn("problem when fetching list of workflow inputs", e); + } + return inputs; + } + + @Override + public String getOutputBaclavaFile() { + try { + return run.getOutputBaclavaFile(); + } catch (RemoteException e) { + log.warn("problem when fetching output baclava file", e); + return null; + } + } + + @Override + public Input makeInput(String name) throws BadStateChangeException { + try { + return new RunInput(run.makeInput(name)); + } catch (RemoteException e) { + throw new BadStateChangeException("failed to make input", e); + } + } + + @Override + public void setInputBaclavaFile(String filename) + throws FilesystemAccessException, BadStateChangeException { + checkBadFilename(filename); + try { + run.setInputBaclavaFile(filename); + } catch (RemoteException e) { + throw new FilesystemAccessException( + "cannot set input baclava file name", e); + } + } + + @Override + public void setOutputBaclavaFile(String filename) + throws FilesystemAccessException, BadStateChangeException { + checkBadFilename(filename); + try { + run.setOutputBaclavaFile(filename); + } catch (RemoteException e) { + throw new FilesystemAccessException( + "cannot set output baclava file name", e); + } + } + + @Override + public Date getCreationTimestamp() { + return creationInstant == null ? null : new Date( + creationInstant.getTime()); + } + + @Override + public Date getFinishTimestamp() { + try { + return run.getFinishTimestamp(); + } catch (RemoteException e) { + log.info("failed to get finish timestamp", e); + return null; + } + } + + @Override + public Date getStartTimestamp() { + try { + return run.getStartTimestamp(); + } catch (RemoteException e) { + log.info("failed to get finish timestamp", e); + return null; + } + } + + /** + * @param readers + * the readers to set + */ + public void setReaders(Set<String> readers) { + this.readers = new HashSet<>(readers); + db.flushToDisk(this); + } + + /** + * @return the readers + */ + public Set<String> getReaders() { + return readers == null ? new HashSet<String>() + : unmodifiableSet(readers); + } + + /** + * @param writers + * the writers to set + */ + public void setWriters(Set<String> writers) { + this.writers = new HashSet<>(writers); + db.flushToDisk(this); + } + + /** + * @return the writers + */ + public Set<String> getWriters() { + return writers == null ? new HashSet<String>() + : unmodifiableSet(writers); + } + + /** + * @param destroyers + * the destroyers to set + */ + public void setDestroyers(Set<String> destroyers) { + this.destroyers = new HashSet<>(destroyers); + db.flushToDisk(this); + } + + /** + * @return the destroyers + */ + public Set<String> getDestroyers() { + return destroyers == null ? new HashSet<String>() + : unmodifiableSet(destroyers); + } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + out.writeUTF(secContext.getOwner().getName()); + out.writeObject(secContext.getFactory()); + out.writeObject(new MarshalledObject<>(run)); + } + + @Override + public boolean getGenerateProvenance() { + return generateProvenance; + } + + @Override + public void setGenerateProvenance(boolean generateProvenance) { + this.generateProvenance = generateProvenance; + db.flushToDisk(this); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, + ClassNotFoundException { + in.defaultReadObject(); + if (log == null) + log = getLog("Taverna.Server.LocalWorker"); + final String creatorName = in.readUTF(); + SecurityContextFactory factory = (SecurityContextFactory) in + .readObject(); + try { + secContext = factory.create(this, + new UsernamePrincipal(creatorName)); + } catch (RuntimeException | IOException e) { + throw e; + } catch (Exception e) { + throw new SecurityContextReconstructionException(e); + } + run = ((MarshalledObject<RemoteSingleRun>) in.readObject()).get(); + } + + public void setSecurityContext(TavernaSecurityContext tavernaSecurityContext) { + secContext = tavernaSecurityContext; + } + + @Override + public String getName() { + return name; + } + + @Override + public void setName(@Nonnull String name) { + if (name.length() > RunConnection.NAME_LENGTH) + this.name = name.substring(0, RunConnection.NAME_LENGTH); + else + this.name = name; + db.flushToDisk(this); + } + + @Override + public void ping() throws UnknownRunException { + try { + run.ping(); + } catch (RemoteException e) { + throw new UnknownRunException(e); + } + } +} + +abstract class DEDelegate implements DirectoryEntry { + Log log = getLog("Taverna.Server.Worker"); + private RemoteDirectoryEntry entry; + private String name; + private String full; + private Date cacheModTime; + private long cacheQueryTime = 0L; + + DEDelegate(RemoteDirectoryEntry entry) { + this.entry = entry; + } + + @Override + public void destroy() throws FilesystemAccessException { + try { + entry.destroy(); + } catch (IOException e) { + throw new FilesystemAccessException( + "failed to delete directory entry", e); + } + } + + @Override + public String getFullName() { + if (full != null) + return full; + String n = getName(); + RemoteDirectoryEntry re = entry; + try { + while (true) { + RemoteDirectory parent = re.getContainingDirectory(); + if (parent == null) + break; + n = parent.getName() + "/" + n; + re = parent; + } + } catch (RemoteException e) { + log.warn("failed to generate full name", e); + } + return (full = n); + } + + @Override + public String getName() { + if (name == null) + try { + name = entry.getName(); + } catch (RemoteException e) { + log.error("failed to get name", e); + } + return name; + } + + @Override + public Date getModificationDate() { + if (cacheModTime == null || currentTimeMillis() - cacheQueryTime < 5000) + try { + cacheModTime = entry.getModificationDate(); + cacheQueryTime = currentTimeMillis(); + } catch (RemoteException e) { + log.error("failed to get modification time", e); + } + return cacheModTime; + } + + @Override + public int compareTo(DirectoryEntry de) { + return getFullName().compareTo(de.getFullName()); + } + + @Override + public boolean equals(Object o) { + return o != null && o instanceof DEDelegate + && getFullName().equals(((DEDelegate) o).getFullName()); + } + + @Override + public int hashCode() { + return getFullName().hashCode(); + } +} + +class DirectoryDelegate extends DEDelegate implements Directory { + RemoteDirectory rd; + + DirectoryDelegate(RemoteDirectory dir) { + super(dir); + rd = dir; + } + + @Override + public Collection<DirectoryEntry> getContents() + throws FilesystemAccessException { + ArrayList<DirectoryEntry> result = new ArrayList<>(); + try { + for (RemoteDirectoryEntry rde : rd.getContents()) { + if (rde instanceof RemoteDirectory) + result.add(new DirectoryDelegate((RemoteDirectory) rde)); + else + result.add(new FileDelegate((RemoteFile) rde)); + } + } catch (IOException e) { + throw new FilesystemAccessException( + "failed to get directory contents", e); + } + return result; + } + + @Override + public Collection<DirectoryEntry> getContentsByDate() + throws FilesystemAccessException { + ArrayList<DirectoryEntry> result = new ArrayList<>(getContents()); + sort(result, new DateComparator()); + return result; + } + + static class DateComparator implements Comparator<DirectoryEntry> { + @Override + public int compare(DirectoryEntry a, DirectoryEntry b) { + return a.getModificationDate().compareTo(b.getModificationDate()); + } + } + + @Override + public File makeEmptyFile(Principal actor, String name) + throws FilesystemAccessException { + try { + return new FileDelegate(rd.makeEmptyFile(name)); + } catch (IOException e) { + throw new FilesystemAccessException("failed to make empty file", e); + } + } + + @Override + public Directory makeSubdirectory(Principal actor, String name) + throws FilesystemAccessException { + try { + return new DirectoryDelegate(rd.makeSubdirectory(name)); + } catch (IOException e) { + throw new FilesystemAccessException("failed to make subdirectory", + e); + } + } + + @Override + public ZipStream getContentsAsZip() throws FilesystemAccessException { + ZipStream zs = new ZipStream(); + + final ZipOutputStream zos; + try { + zos = new ZipOutputStream(new PipedOutputStream(zs)); + } catch (IOException e) { + throw new FilesystemAccessException("problem building zip stream", + e); + } + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + zipDirectory(rd, null, zos); + } catch (IOException e) { + log.warn("problem when zipping directory", e); + } finally { + closeQuietly(zos); + } + } + }); + t.setDaemon(true); + t.start(); + return zs; + } + + /** + * Compresses a directory tree into a ZIP. + * + * @param dir + * The directory to compress. + * @param base + * The base name of the directory (or <tt>null</tt> if this is + * the root directory of the ZIP). + * @param zos + * Where to write the compressed data. + * @throws RemoteException + * If some kind of problem happens with the remote delegates. + * @throws IOException + * If we run into problems with reading or writing data. + */ + void zipDirectory(RemoteDirectory dir, String base, ZipOutputStream zos) + throws RemoteException, IOException { + for (RemoteDirectoryEntry rde : dir.getContents()) { + String name = rde.getName(); + if (base != null) + name = base + "/" + name; + if (rde instanceof RemoteDirectory) { + RemoteDirectory rd = (RemoteDirectory) rde; + zipDirectory(rd, name, zos); + } else { + RemoteFile rf = (RemoteFile) rde; + zos.putNextEntry(new ZipEntry(name)); + try { + int off = 0; + while (true) { + byte[] c = rf.getContents(off, 64 * 1024); + if (c == null || c.length == 0) + break; + zos.write(c); + off += c.length; + } + } finally { + zos.closeEntry(); + } + } + } + } +} + +class FileDelegate extends DEDelegate implements File { + RemoteFile rf; + + FileDelegate(RemoteFile f) { + super(f); + this.rf = f; + } + + @Override + public byte[] getContents(int offset, int length) + throws FilesystemAccessException { + try { + return rf.getContents(offset, length); + } catch (IOException e) { + throw new FilesystemAccessException("failed to read file contents", + e); + } + } + + @Override + public long getSize() throws FilesystemAccessException { + try { + return rf.getSize(); + } catch (IOException e) { + throw new FilesystemAccessException("failed to get file length", e); + } + } + + @Override + public void setContents(byte[] data) throws FilesystemAccessException { + try { + rf.setContents(data); + } catch (IOException e) { + throw new FilesystemAccessException( + "failed to write file contents", e); + } + } + + @Override + public void appendContents(byte[] data) throws FilesystemAccessException { + try { + rf.appendContents(data); + } catch (IOException e) { + throw new FilesystemAccessException( + "failed to write file contents", e); + } + } + + @Override + public void copy(File from) throws FilesystemAccessException { + FileDelegate fromFile; + try { + fromFile = (FileDelegate) from; + } catch (ClassCastException e) { + throw new FilesystemAccessException("different types of File?!"); + } + + try { + rf.copy(fromFile.rf); + } catch (Exception e) { + throw new FilesystemAccessException("failed to copy file contents", + e); + } + return; + } +} + +class ListenerDelegate implements Listener { + private Log log = getLog("Taverna.Server.Worker"); + private RemoteListener r; + String conf; + + ListenerDelegate(RemoteListener l) { + r = l; + } + + RemoteListener getRemote() { + return r; + } + + @Override + public String getConfiguration() { + try { + if (conf == null) + conf = r.getConfiguration(); + } catch (RemoteException e) { + log.warn("failed to get configuration", e); + } + return conf; + } + + @Override + public String getName() { + try { + return r.getName(); + } catch (RemoteException e) { + log.warn("failed to get name", e); + return "UNKNOWN NAME"; + } + } + + @Override + public String getProperty(String propName) throws NoListenerException { + try { + return r.getProperty(propName); + } catch (RemoteException e) { + throw new NoListenerException("no such property: " + propName, e); + } + } + + @Override + public String getType() { + try { + return r.getType(); + } catch (RemoteException e) { + log.warn("failed to get type", e); + return "UNKNOWN TYPE"; + } + } + + @Override + public String[] listProperties() { + try { + return r.listProperties(); + } catch (RemoteException e) { + log.warn("failed to list properties", e); + return new String[0]; + } + } + + @Override + public void setProperty(String propName, String value) + throws NoListenerException, BadPropertyValueException { + try { + r.setProperty(propName, value); + } catch (RemoteException e) { + log.warn("failed to set property", e); + if (e.getCause() != null + && e.getCause() instanceof RuntimeException) + throw new NoListenerException("failed to set property", + e.getCause()); + if (e.getCause() != null && e.getCause() instanceof Exception) + throw new BadPropertyValueException("failed to set property", + e.getCause()); + throw new BadPropertyValueException("failed to set property", e); + } + } +} + +class RunInput implements Input { + private final RemoteInput i; + + RunInput(RemoteInput remote) { + this.i = remote; + } + + @Override + public String getFile() { + try { + return i.getFile(); + } catch (RemoteException e) { + return null; + } + } + + @Override + public String getName() { + try { + return i.getName(); + } catch (RemoteException e) { + return null; + } + } + + @Override + public String getValue() { + try { + return i.getValue(); + } catch (RemoteException e) { + return null; + } + } + + @Override + public void setFile(String file) throws FilesystemAccessException, + BadStateChangeException { + checkBadFilename(file); + try { + i.setFile(file); + } catch (RemoteException e) { + throw new FilesystemAccessException("cannot set file for input", e); + } + } + + @Override + public void setValue(String value) throws BadStateChangeException { + try { + i.setValue(value); + } catch (RemoteException e) { + throw new BadStateChangeException(e); + } + } + + @Override + public String getDelimiter() { + try { + return i.getDelimiter(); + } catch (RemoteException e) { + return null; + } + } + + @Override + public void setDelimiter(String delimiter) throws BadStateChangeException { + try { + if (delimiter != null) + delimiter = delimiter.substring(0, 1); + i.setDelimiter(delimiter); + } catch (RemoteException e) { + throw new BadStateChangeException(e); + } + } +} + +@SuppressWarnings("serial") +class SecurityContextReconstructionException extends RuntimeException { + public SecurityContextReconstructionException(Throwable t) { + super("failed to rebuild security context", t); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunConnection.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunConnection.java b/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunConnection.java new file mode 100644 index 0000000..cf55ea0 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunConnection.java @@ -0,0 +1,239 @@ +/* + * Copyright (C) 2010-2013 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.master.worker; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static org.taverna.server.master.worker.RunConnection.COUNT_QUERY; +import static org.taverna.server.master.worker.RunConnection.NAMES_QUERY; +import static org.taverna.server.master.worker.RunConnection.SCHEMA; +import static org.taverna.server.master.worker.RunConnection.TABLE; +import static org.taverna.server.master.worker.RunConnection.TIMEOUT_QUERY; +import static org.taverna.server.master.worker.RunConnection.UNTERMINATED_QUERY; + +import java.io.IOException; +import java.rmi.MarshalledObject; +import java.util.Date; +import java.util.HashSet; +import java.util.List; + +import javax.annotation.Nonnull; +import javax.jdo.annotations.Column; +import javax.jdo.annotations.Join; +import javax.jdo.annotations.PersistenceCapable; +import javax.jdo.annotations.Persistent; +import javax.jdo.annotations.PrimaryKey; +import javax.jdo.annotations.Queries; +import javax.jdo.annotations.Query; + +import org.taverna.server.localworker.remote.RemoteSingleRun; +import org.taverna.server.master.common.Credential; +import org.taverna.server.master.common.Trust; +import org.taverna.server.master.common.Workflow; +import org.taverna.server.master.interfaces.SecurityContextFactory; +import org.taverna.server.master.utils.UsernamePrincipal; + +/** + * The representation of the connections to the runs that actually participates + * in the persistence system. + * + * @author Donal Fellows + */ +@PersistenceCapable(table = TABLE, schema = SCHEMA) +@Queries({ + @Query(name = "count", language = "SQL", value = COUNT_QUERY, unique = "true", resultClass = Integer.class), + @Query(name = "names", language = "SQL", value = NAMES_QUERY, unique = "false", resultClass = String.class), + @Query(name = "unterminated", language = "SQL", value = UNTERMINATED_QUERY, unique = "false", resultClass = String.class), + @Query(name = "timedout", language = "SQL", value = TIMEOUT_QUERY, unique = "false", resultClass = String.class) }) +public class RunConnection { + static final String SCHEMA = "TAVERNA"; + static final String TABLE = "RUN_CONNECTION"; + private static final String FULL_NAME = SCHEMA + "." + TABLE; + static final String COUNT_QUERY = "SELECT count(*) FROM " + FULL_NAME; + static final String NAMES_QUERY = "SELECT ID FROM " + FULL_NAME; + static final String TIMEOUT_QUERY = "SELECT ID FROM " + FULL_NAME + + " WHERE expiry < CURRENT_TIMESTAMP"; + static final String UNTERMINATED_QUERY = "SELECT ID FROM " + FULL_NAME + + " WHERE doneTransitionToFinished = 0"; + static final int NAME_LENGTH = 48; + + @PrimaryKey + @Column(length = 40) + private String id; + + @Persistent(defaultFetchGroup = "true") + @Column(length = NAME_LENGTH) + private String name; + + @Persistent(defaultFetchGroup = "true") + private Date creationInstant; + + @Persistent(defaultFetchGroup = "true", serialized = "true") + @Column(jdbcType = "BLOB", sqlType = "BLOB") + private Workflow workflow; + + @Persistent(defaultFetchGroup = "true") + private Date expiry; + + @Persistent(defaultFetchGroup = "true") + @Join(table = TABLE + "_READERS", column = "ID") + private String[] readers; + + @Persistent(defaultFetchGroup = "true") + @Join(table = TABLE + "_WRITERS", column = "ID") + private String[] writers; + + @Persistent(defaultFetchGroup = "true") + @Join(table = TABLE + "_DESTROYERS", column = "ID") + private String[] destroyers; + + @Persistent(defaultFetchGroup = "true", serialized = "true") + @Column(jdbcType = "BLOB", sqlType = "BLOB") + private MarshalledObject<RemoteSingleRun> run; + + @Persistent(defaultFetchGroup = "true") + private int doneTransitionToFinished; + + @Persistent(defaultFetchGroup = "true") + private int generateProvenance; + + @Persistent(defaultFetchGroup = "true") + @Column(length = 128) + String owner; + + @Persistent(defaultFetchGroup = "true") + @Column(length = 36) + private String securityToken; + + @Persistent(defaultFetchGroup = "true", serialized = "true") + @Column(jdbcType = "BLOB", sqlType = "BLOB") + private SecurityContextFactory securityContextFactory; + @Persistent(defaultFetchGroup = "true", serialized = "true") + @Column(jdbcType = "BLOB", sqlType = "BLOB") + private Credential[] credentials; + @Persistent(defaultFetchGroup = "true", serialized = "true") + @Column(jdbcType = "BLOB", sqlType = "BLOB") + private Trust[] trust; + + private static final String[] STRING_ARY = new String[0]; + + public String getId() { + return id; + } + + public boolean isFinished() { + return doneTransitionToFinished != 0; + } + + public void setFinished(boolean finished) { + doneTransitionToFinished = (finished ? 1 : 0); + } + + public boolean isProvenanceGenerated() { + return generateProvenance != 0; + } + + public void setProvenanceGenerated(boolean generate) { + generateProvenance = (generate ? 1 : 0); + } + + /** + * Manufacture a persistent representation of the given workflow run. Must + * be called within the context of a transaction. + * + * @param rrd + * The remote delegate of the workflow run. + * @return The persistent object. + * @throws IOException + * If serialisation fails. + */ + @Nonnull + public static RunConnection toDBform(@Nonnull RemoteRunDelegate rrd) + throws IOException { + RunConnection rc = new RunConnection(); + rc.id = rrd.id; + rc.makeChanges(rrd); + return rc; + } + + private static List<String> list(String[] ary) { + if (ary == null) + return emptyList(); + return asList(ary); + } + + /** + * Get the remote run delegate for a particular persistent connection. Must + * be called within the context of a transaction. + * + * @param db + * The database facade. + * @return The delegate object. + * @throws Exception + * If anything goes wrong. + */ + @Nonnull + public RemoteRunDelegate fromDBform(@Nonnull RunDBSupport db) + throws Exception { + RemoteRunDelegate rrd = new RemoteRunDelegate(); + rrd.id = getId(); + rrd.creationInstant = creationInstant; + rrd.workflow = workflow; + rrd.expiry = expiry; + rrd.readers = new HashSet<>(list(readers)); + rrd.writers = new HashSet<>(list(writers)); + rrd.destroyers = new HashSet<>(list(destroyers)); + rrd.run = run.get(); + rrd.doneTransitionToFinished = isFinished(); + rrd.generateProvenance = isProvenanceGenerated(); + rrd.secContext = securityContextFactory.create(rrd, + new UsernamePrincipal(owner)); + ((SecurityContextDelegate)rrd.secContext).setCredentialsAndTrust(credentials,trust); + rrd.db = db; + rrd.factory = db.getFactory(); + rrd.name = name; + return rrd; + } + + /** + * Flush changes from a remote run delegate to the database. Must be called + * within the context of a transaction. + * + * @param rrd + * The remote run delegate object that has potential changes. + * @throws IOException + * If anything goes wrong in serialization. + */ + public void makeChanges(@Nonnull RemoteRunDelegate rrd) throws IOException { + // Properties that are set exactly once + if (creationInstant == null) { + creationInstant = rrd.getCreationTimestamp(); + workflow = rrd.getWorkflow(); + run = new MarshalledObject<>(rrd.run); + securityContextFactory = rrd.getSecurityContext().getFactory(); + owner = rrd.getSecurityContext().getOwner().getName(); + securityToken = ((org.taverna.server.master.worker.SecurityContextFactory) securityContextFactory) + .issueNewPassword(); + } + // Properties that are set multiple times + expiry = rrd.getExpiry(); + readers = rrd.getReaders().toArray(STRING_ARY); + writers = rrd.getWriters().toArray(STRING_ARY); + destroyers = rrd.getDestroyers().toArray(STRING_ARY); + credentials = rrd.getSecurityContext().getCredentials(); + trust = rrd.getSecurityContext().getTrusted(); + if (rrd.name.length() > NAME_LENGTH) + this.name = rrd.name.substring(0, NAME_LENGTH); + else + this.name = rrd.name; + setFinished(rrd.doneTransitionToFinished); + setProvenanceGenerated(rrd.generateProvenance); + } + + public String getSecurityToken() { + return securityToken; + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunDBSupport.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunDBSupport.java b/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunDBSupport.java new file mode 100644 index 0000000..2aa7ed1 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunDBSupport.java @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2010-2011 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.master.worker; + +import java.util.List; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import org.taverna.server.master.notification.NotificationEngine; + +/** + * The interface to the database of runs. + * + * @author Donal Fellows + */ +public interface RunDBSupport { + /** + * Scan each run to see if it has finished yet and issue registered + * notifications if it has. + */ + void checkForFinishNow(); + + /** + * Remove currently-expired runs from this database. + */ + void cleanNow(); + + /** + * How many runs are stored in the database. + * + * @return The current size of the run table. + */ + int countRuns(); + + /** + * Ensure that a run gets persisted in the database. It is assumed that the + * value is already in there. + * + * @param run + * The run to persist. + */ + void flushToDisk(@Nonnull RemoteRunDelegate run); + + /** + * Select an arbitrary representative run. + * + * @return The selected run. + * @throws Exception + * If anything goes wrong. + */ + @Nullable + RemoteRunDelegate pickArbitraryRun() throws Exception; + + /** + * Get a list of all the run names. + * + * @return The names (i.e., UUIDs) of all the runs. + */ + @Nonnull + List<String> listRunNames(); + + /** + * @param notificationEngine + * A reference to the notification fabric bean. + */ + void setNotificationEngine(NotificationEngine notificationEngine); + + /** + * @param notifier + * A reference to the bean that creates messages about workflow + * run termination. + */ + void setNotifier(CompletionNotifier notifier); + + /** + * @return A reference to the actual factory for remote runs. + */ + FactoryBean getFactory(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunDatabase.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunDatabase.java b/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunDatabase.java new file mode 100644 index 0000000..cedb4b5 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunDatabase.java @@ -0,0 +1,311 @@ +/* + * Copyright (C) 2010-2013 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.master.worker; + +import static java.lang.Integer.parseInt; +import static java.util.UUID.randomUUID; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; + +import javax.annotation.Nullable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Required; +import org.taverna.server.master.common.Status; +import org.taverna.server.master.exceptions.UnknownRunException; +import org.taverna.server.master.interfaces.Listener; +import org.taverna.server.master.interfaces.Policy; +import org.taverna.server.master.interfaces.RunStore; +import org.taverna.server.master.interfaces.TavernaRun; +import org.taverna.server.master.notification.NotificationEngine; +import org.taverna.server.master.notification.NotificationEngine.Message; +import org.taverna.server.master.utils.UsernamePrincipal; + +/** + * The main facade bean that interfaces to the database of runs. + * + * @author Donal Fellows + */ +public class RunDatabase implements RunStore, RunDBSupport { + private Log log = LogFactory.getLog("Taverna.Server.Worker.RunDB"); + RunDatabaseDAO dao; + CompletionNotifier backupNotifier; + Map<String, CompletionNotifier> typedNotifiers; + private NotificationEngine notificationEngine; + @Autowired + private FactoryBean factory; + private Map<String, TavernaRun> cache = new HashMap<>(); + + @Override + @Required + public void setNotifier(CompletionNotifier n) { + backupNotifier = n; + } + + public void setTypeNotifiers(List<CompletionNotifier> notifiers) { + typedNotifiers = new HashMap<>(); + for (CompletionNotifier n : notifiers) + typedNotifiers.put(n.getName(), n); + } + + @Required + @Override + public void setNotificationEngine(NotificationEngine notificationEngine) { + this.notificationEngine = notificationEngine; + } + + @Required + public void setDao(RunDatabaseDAO dao) { + this.dao = dao; + } + + @Override + public void checkForFinishNow() { + /* + * Get which runs are actually newly finished; this requires getting the + * candidates from the database and *then* doing the expensive requests + * to the back end to find out the status. + */ + Map<String, RemoteRunDelegate> notifiable = new HashMap<>(); + for (RemoteRunDelegate p : dao.getPotentiallyNotifiable()) + if (p.getStatus() == Status.Finished) + notifiable.put(p.getId(), p); + + // Check if there's nothing more to do + if (notifiable.isEmpty()) + return; + + /* + * Tell the database about the ones we've got. + */ + dao.markFinished(notifiable.keySet()); + + /* + * Send out the notifications. The notification addresses are stored in + * the back-end engine, so this is *another* thing that can take time. + */ + for (RemoteRunDelegate rrd : notifiable.values()) + for (Listener l : rrd.getListeners()) + if (l.getName().equals("io")) { + try { + notifyFinished(rrd.id, l, rrd); + } catch (Exception e) { + log.warn("failed to do notification of completion", e); + } + break; + } + } + + @Override + public void cleanNow() { + List<String> cleaned; + try { + cleaned = dao.doClean(); + } catch (Exception e) { + log.warn("failure during deletion of expired runs", e); + return; + } + synchronized (cache) { + for (String id : cleaned) + cache.remove(id); + } + } + + @Override + public int countRuns() { + return dao.countRuns(); + } + + @Override + public void flushToDisk(RemoteRunDelegate run) { + try { + dao.flushToDisk(run); + } catch (IOException e) { + throw new RuntimeException( + "unexpected problem when persisting run record in database", + e); + } + } + + @Override + public RemoteRunDelegate pickArbitraryRun() throws Exception { + return dao.pickArbitraryRun(); + } + + @Override + public List<String> listRunNames() { + return dao.listRunNames(); + } + + @Nullable + private TavernaRun get(String uuid) { + TavernaRun run = null; + synchronized (cache) { + run = cache.get(uuid); + } + try { + if (run != null) + run.ping(); + } catch (UnknownRunException e) { + if (log.isDebugEnabled()) + log.debug("stale mapping in cache?", e); + // Don't need to flush the cache; this happens when cleaning anyway + run = null; + } + if (run == null) + run = dao.get(uuid); + return run; + } + + @Override + public TavernaRun getRun(UsernamePrincipal user, Policy p, String uuid) + throws UnknownRunException { + // Check first to see if the 'uuid' actually looks like a UUID; if + // not, throw it out immediately without logging an exception. + try { + UUID.fromString(uuid); + } catch (IllegalArgumentException e) { + if (log.isDebugEnabled()) + log.debug("run ID does not look like UUID; rejecting..."); + throw new UnknownRunException(); + } + TavernaRun run = get(uuid); + if (run != null && (user == null || p.permitAccess(user, run))) + return run; + throw new UnknownRunException(); + } + + @Override + public TavernaRun getRun(String uuid) throws UnknownRunException { + TavernaRun run = get(uuid); + if (run != null) + return run; + throw new UnknownRunException(); + } + + @Override + public Map<String, TavernaRun> listRuns(UsernamePrincipal user, Policy p) { + synchronized (cache) { + Map<String, TavernaRun> cached = new HashMap<>(); + for (Entry<String, TavernaRun> e : cache.entrySet()) { + TavernaRun r = e.getValue(); + if (p.permitAccess(user, r)) + cached.put(e.getKey(), r); + } + if (!cached.isEmpty()) + return cached; + } + return dao.listRuns(user, p); + } + + private void logLength(String message, Object obj) { + if (!log.isDebugEnabled()) + return; + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(obj); + } + log.debug(message + ": " + baos.size()); + } catch (Exception e) { + log.warn("oops", e); + } + } + + @Override + public String registerRun(TavernaRun run) { + if (!(run instanceof RemoteRunDelegate)) + throw new IllegalArgumentException( + "run must be created by localworker package"); + RemoteRunDelegate rrd = (RemoteRunDelegate) run; + if (rrd.id == null) + rrd.id = randomUUID().toString(); + logLength("RemoteRunDelegate serialized length", rrd); + try { + dao.persistRun(rrd); + } catch (IOException e) { + throw new RuntimeException( + "unexpected problem when persisting run record in database", + e); + } + synchronized (cache) { + cache.put(rrd.getId(), run); + } + return rrd.getId(); + } + + @Override + public void unregisterRun(String uuid) { + try { + if (dao.unpersistRun(uuid)) + synchronized (cache) { + cache.remove(uuid); + } + } catch (RuntimeException e) { + if (log.isDebugEnabled()) + log.debug("problem persisting the deletion of the run " + uuid, + e); + } + } + + /** + * Process the event that a run has finished. + * + * @param name + * The name of the run. + * @param io + * The io listener of the run (used to get information about the + * run). + * @param run + * The handle to the run. + * @throws Exception + * If anything goes wrong. + */ + private void notifyFinished(final String name, Listener io, + final RemoteRunDelegate run) throws Exception { + String to = io.getProperty("notificationAddress"); + final int code; + try { + code = parseInt(io.getProperty("exitcode")); + } catch (NumberFormatException nfe) { + // Ignore; not much we can do here... + return; + } + + notificationEngine.dispatchMessage(run, to, new Message() { + private CompletionNotifier getNotifier(String type) { + CompletionNotifier n = typedNotifiers.get(type); + if (n == null) + n = backupNotifier; + return n; + } + + @Override + public String getContent(String type) { + return getNotifier(type).makeCompletionMessage(name, run, code); + } + + @Override + public String getTitle(String type) { + return getNotifier(type).makeMessageSubject(name, run, code); + } + }); + } + + @Override + public FactoryBean getFactory() { + return factory; + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunDatabaseDAO.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunDatabaseDAO.java b/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunDatabaseDAO.java new file mode 100644 index 0000000..51931c0 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunDatabaseDAO.java @@ -0,0 +1,306 @@ +/* + * Copyright (C) 2010-2013 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.master.worker; + +import static org.taverna.server.master.worker.RunConnection.toDBform; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.jdo.annotations.PersistenceAware; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Required; +import org.taverna.server.master.interfaces.Policy; +import org.taverna.server.master.interfaces.TavernaRun; +import org.taverna.server.master.utils.CallTimeLogger.PerfLogged; +import org.taverna.server.master.utils.JDOSupport; +import org.taverna.server.master.utils.UsernamePrincipal; + +/** + * This handles storing runs, interfacing with the underlying state engine as + * necessary. + * + * @author Donal Fellows + */ +@PersistenceAware +public class RunDatabaseDAO extends JDOSupport<RunConnection> { + public RunDatabaseDAO() { + super(RunConnection.class); + } + + private Log log = LogFactory.getLog("Taverna.Server.Worker.RunDB"); + private RunDatabase facade; + + @Required + public void setFacade(RunDatabase facade) { + this.facade = facade; + } + + // -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + + @SuppressWarnings("unchecked") + private List<String> nameRuns() { + if (log.isDebugEnabled()) + log.debug("fetching all run names"); + return (List<String>) namedQuery("names").execute(); + } + + /** + * @return The number of workflow runs in the database. + */ + @WithinSingleTransaction + public int countRuns() { + if (log.isDebugEnabled()) + log.debug("counting the number of runs"); + return (Integer) namedQuery("count").execute(); + } + + @SuppressWarnings("unchecked") + private List<String> expiredRuns() { + return (List<String>) namedQuery("timedout").execute(); + } + + @SuppressWarnings("unchecked") + private List<String> unterminatedRuns() { + return (List<String>) namedQuery("unterminated").execute(); + } + + @Nullable + private RunConnection pickRun(@Nonnull String name) { + if (log.isDebugEnabled()) + log.debug("fetching the run called " + name); + try { + RunConnection rc = getById(name); + if (rc == null) + log.warn("no result for " + name); + return rc; + } catch (RuntimeException e) { + log.warn("problem in fetch", e); + throw e; + } + } + + @Nullable + @WithinSingleTransaction + public String getSecurityToken(@Nonnull String name) { + RunConnection rc = getById(name); + if (rc == null) + return null; + return rc.getSecurityToken(); + } + + private void persist(@Nonnull RemoteRunDelegate rrd) throws IOException { + persist(toDBform(rrd)); + } + + @Nonnull + private List<RunConnection> allRuns() { + try { + List<RunConnection> rcs = new ArrayList<>(); + List<String> names = nameRuns(); + for (String id : names) { + try { + if (id != null) + rcs.add(pickRun(id)); + } catch (RuntimeException e) { + continue; + } + } + return rcs; + } catch (RuntimeException e) { + log.warn("problem in fetch", e); + throw e; + } + } + + // -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + + /** + * Obtain a workflow run handle. + * + * @param name + * The identifier of the run. + * @return The run handle, or <tt>null</tt> if there is no such run. + */ + @Nullable + @WithinSingleTransaction + public TavernaRun get(String name) { + try { + RunConnection rc = pickRun(name); + return (rc == null) ? null : rc.fromDBform(facade); + } catch (Exception e) { + return null; + } + } + + /** + * Get the runs that a user can read things from. + * + * @param user + * Who is asking? + * @param p + * The policy that determines what they can see. + * @return A mapping from run IDs to run handles. + */ + @Nonnull + @WithinSingleTransaction + public Map<String, TavernaRun> listRuns(UsernamePrincipal user, Policy p) { + Map<String, TavernaRun> result = new HashMap<>(); + for (String id : nameRuns()) + try { + RemoteRunDelegate rrd = pickRun(id).fromDBform(facade); + if (p.permitAccess(user, rrd)) + result.put(id, rrd); + } catch (Exception e) { + continue; + } + return result; + } + + /** + * @return A list of the IDs for all workflow runs. + */ + @Nonnull + @WithinSingleTransaction + public List<String> listRunNames() { + List<String> runNames = new ArrayList<>(); + for (RunConnection rc : allRuns()) + if (rc.getId() != null) + runNames.add(rc.getId()); + return runNames; + } + + /** + * @return An arbitrary, representative workflow run. + * @throws Exception + * If anything goes wrong. + */ + @Nullable + @WithinSingleTransaction + public RemoteRunDelegate pickArbitraryRun() throws Exception { + for (RunConnection rc : allRuns()) { + if (rc.getId() == null) + continue; + return rc.fromDBform(facade); + } + return null; + } + + /** + * Make a workflow run persistent. Must only be called once per workflow + * run. + * + * @param rrd + * The workflow run to persist. + * @throws IOException + * If anything goes wrong with serialisation of the run. + */ + @WithinSingleTransaction + public void persistRun(@Nonnull RemoteRunDelegate rrd) throws IOException { + persist(rrd); + } + + /** + * Stop a workflow run from being persistent. + * + * @param name + * The ID of the run. + * @return Whether a deletion happened. + */ + @WithinSingleTransaction + public boolean unpersistRun(String name) { + RunConnection rc = pickRun(name); + if (rc != null) + delete(rc); + return rc != null; + } + + /** + * Ensure that the given workflow run is synchronized with the database. + * + * @param run + * The run to synchronise. + * @throws IOException + * If serialization of anything fails. + */ + @WithinSingleTransaction + public void flushToDisk(@Nonnull RemoteRunDelegate run) throws IOException { + getById(run.id).makeChanges(run); + } + + /** + * Remove all workflow runs that have expired. + * + * @return The ids of the deleted runs. + */ + @Nonnull + @PerfLogged + @WithinSingleTransaction + public List<String> doClean() { + if (log.isDebugEnabled()) + log.debug("deleting runs that timed out before " + new Date()); + List<String> toDelete = expiredRuns(); + if (log.isDebugEnabled()) + log.debug("found " + toDelete.size() + " runs to delete"); + for (String id : toDelete) { + RunConnection rc = getById(id); + try { + rc.fromDBform(facade).run.destroy(); + } catch (Exception e) { + if (log.isDebugEnabled()) + log.debug("failed to delete execution resource for " + id, + e); + } + delete(rc); + } + return toDelete; + } + + /** + * @return A list of workflow runs that are candidates for doing + * notification of termination. + */ + @Nonnull + @PerfLogged + @WithinSingleTransaction + public List<RemoteRunDelegate> getPotentiallyNotifiable() { + List<RemoteRunDelegate> toNotify = new ArrayList<>(); + for (String id : unterminatedRuns()) + try { + RunConnection rc = getById(id); + toNotify.add(rc.fromDBform(facade)); + } catch (Exception e) { + log.warn("failed to fetch connection token" + + "for notification of completion check", e); + } + return toNotify; + } + + @PerfLogged + @WithinSingleTransaction + public void markFinished(@Nonnull Set<String> terminated) { + for (String id : terminated) { + RunConnection rc = getById(id); + if (rc == null) + continue; + try { + rc.fromDBform(facade).doneTransitionToFinished = true; + rc.setFinished(true); + } catch (Exception e) { + log.warn("failed to note termination", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunFactoryConfiguration.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunFactoryConfiguration.java b/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunFactoryConfiguration.java new file mode 100644 index 0000000..29ac884 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/taverna/server/master/worker/RunFactoryConfiguration.java @@ -0,0 +1,395 @@ +package org.taverna.server.master.worker; + +import static org.springframework.jmx.support.MetricType.COUNTER; +import static org.springframework.jmx.support.MetricType.GAUGE; +import static org.taverna.server.master.TavernaServer.JMX_ROOT; + +import java.util.List; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.PreDestroy; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.annotation.Order; +import org.springframework.jmx.export.annotation.ManagedAttribute; +import org.springframework.jmx.export.annotation.ManagedMetric; +import org.springframework.jmx.export.annotation.ManagedResource; +import org.taverna.server.master.factories.ConfigurableRunFactory; +import org.taverna.server.master.localworker.LocalWorkerState; + +@ManagedResource(objectName = JMX_ROOT + "Factory", description = "The factory for runs.") +public abstract class RunFactoryConfiguration implements ConfigurableRunFactory { + protected Log log = LogFactory.getLog("Taverna.Server.Worker"); + protected LocalWorkerState state; + protected RunDBSupport runDB; + private int totalRuns = 0; + + @PreDestroy + void closeLog() { + log = null; + } + + @Autowired(required = true) + @Order(0) + void setState(LocalWorkerState state) { + this.state = state; + } + + @Autowired(required = true) + @Order(0) + void setRunDB(RunDBSupport runDB) { + this.runDB = runDB; + } + + /** + * Drop any current references to the registry of runs, and kill off that + * process. + */ + protected abstract void reinitRegistry(); + + /** + * Drop any current references to the run factory subprocess and kill it + * off. + */ + protected abstract void reinitFactory(); + + /** Count the number of operating runs. */ + protected abstract int operatingCount() throws Exception; + + protected final synchronized void incrementRunCount() { + totalRuns++; + } + + @Override + @ManagedAttribute(description = "Whether it is allowed to start a run executing.", currencyTimeLimit = 30) + public final boolean isAllowingRunsToStart() { + try { + return state.getOperatingLimit() > getOperatingCount(); + } catch (Exception e) { + log.info("failed to get operating run count", e); + return false; + } + } + + @Override + @ManagedAttribute(description = "The host holding the RMI registry to communicate via.") + public final String getRegistryHost() { + return state.getRegistryHost(); + } + + @Override + @ManagedAttribute(description = "The host holding the RMI registry to communicate via.") + public final void setRegistryHost(String host) { + state.setRegistryHost(host); + reinitRegistry(); + reinitFactory(); + } + + @Override + @ManagedAttribute(description = "The port number of the RMI registry. Should not normally be set.") + public final int getRegistryPort() { + return state.getRegistryPort(); + } + + @Override + @ManagedAttribute(description = "The port number of the RMI registry. Should not normally be set.") + public final void setRegistryPort(int port) { + state.setRegistryPort(port); + reinitRegistry(); + reinitFactory(); + } + + @Nonnull + @Override + @ManagedAttribute(description = "What JAR do we use to start the RMI registry process?") + public final String getRmiRegistryJar() { + return state.getRegistryJar(); + } + + @Override + @ManagedAttribute(description = "What JAR do we use to start the RMI registry process?") + public final void setRmiRegistryJar(String rmiRegistryJar) { + state.setRegistryJar(rmiRegistryJar); + reinitRegistry(); + reinitFactory(); + } + + @Override + @ManagedAttribute(description = "The maximum number of simultaneous runs supported by the server.", currencyTimeLimit = 300) + public final int getMaxRuns() { + return state.getMaxRuns(); + } + + @Override + @ManagedAttribute(description = "The maximum number of simultaneous runs supported by the server.", currencyTimeLimit = 300) + public final void setMaxRuns(int maxRuns) { + state.setMaxRuns(maxRuns); + } + + /** @return How many minutes should a workflow live by default? */ + @Override + @ManagedAttribute(description = "How many minutes should a workflow live by default?", currencyTimeLimit = 300) + public final int getDefaultLifetime() { + return state.getDefaultLifetime(); + } + + /** + * Set how long a workflow should live by default. + * + * @param defaultLifetime + * Default lifetime, in minutes. + */ + @Override + @ManagedAttribute(description = "How many minutes should a workflow live by default?", currencyTimeLimit = 300) + public final void setDefaultLifetime(int defaultLifetime) { + state.setDefaultLifetime(defaultLifetime); + } + + /** + * @return How many milliseconds to wait between checks to see if a worker + * process has registered. + */ + @Override + @ManagedAttribute(description = "How many milliseconds to wait between checks to see if a worker process has registered.", currencyTimeLimit = 300) + public final int getSleepTime() { + return state.getSleepMS(); + } + + /** + * @param sleepTime + * How many milliseconds to wait between checks to see if a + * worker process has registered. + */ + @Override + @ManagedAttribute(description = "How many milliseconds to wait between checks to see if a worker process has registered.", currencyTimeLimit = 300) + public final void setSleepTime(int sleepTime) { + state.setSleepMS(sleepTime); + } + + /** + * @return How many seconds to wait for a worker process to register itself. + */ + @Override + @ManagedAttribute(description = "How many seconds to wait for a worker process to register itself.", currencyTimeLimit = 300) + public final int getWaitSeconds() { + return state.getWaitSeconds(); + } + + /** + * @param seconds + * How many seconds to wait for a worker process to register + * itself. + */ + @Override + @ManagedAttribute(description = "How many seconds to wait for a worker process to register itself.", currencyTimeLimit = 300) + public final void setWaitSeconds(int seconds) { + state.setWaitSeconds(seconds); + } + + /** @return The script to run to start running a workflow. */ + @Nonnull + @Override + @ManagedAttribute(description = "The script to run to start running a workflow.", currencyTimeLimit = 300) + public final String getExecuteWorkflowScript() { + return state.getExecuteWorkflowScript(); + } + + /** + * @param executeWorkflowScript + * The script to run to start running a workflow. + */ + @Override + @ManagedAttribute(description = "The script to run to start running a workflow.", currencyTimeLimit = 300) + public final void setExecuteWorkflowScript(String executeWorkflowScript) { + state.setExecuteWorkflowScript(executeWorkflowScript); + reinitFactory(); + } + + /** @return The location of the JAR implementing the server worker processes. */ + @Nonnull + @Override + @ManagedAttribute(description = "The location of the JAR implementing the server worker processes.") + public final String getServerWorkerJar() { + return state.getServerWorkerJar(); + } + + /** + * @param serverWorkerJar + * The location of the JAR implementing the server worker + * processes. + */ + @Override + @ManagedAttribute(description = "The location of the JAR implementing the server worker processes.") + public final void setServerWorkerJar(String serverWorkerJar) { + state.setServerWorkerJar(serverWorkerJar); + reinitFactory(); + } + + /** @return The list of additional arguments used to make a worker process. */ + @Nonnull + @Override + @ManagedAttribute(description = "The list of additional arguments used to make a worker process.", currencyTimeLimit = 300) + public final String[] getExtraArguments() { + return state.getExtraArgs(); + } + + /** + * @param extraArguments + * The list of additional arguments used to make a worker + * process. + */ + @Override + @ManagedAttribute(description = "The list of additional arguments used to make a worker process.", currencyTimeLimit = 300) + public final void setExtraArguments(@Nonnull String[] extraArguments) { + state.setExtraArgs(extraArguments); + reinitFactory(); + } + + /** @return Which java executable to run. */ + @Nonnull + @Override + @ManagedAttribute(description = "Which java executable to run.", currencyTimeLimit = 300) + public final String getJavaBinary() { + return state.getJavaBinary(); + } + + /** + * @param javaBinary + * Which java executable to run. + */ + @Override + @ManagedAttribute(description = "Which java executable to run.", currencyTimeLimit = 300) + public final void setJavaBinary(@Nonnull String javaBinary) { + state.setJavaBinary(javaBinary); + reinitFactory(); + } + + /** + * @return A file containing a password to use when running a program as + * another user (e.g., with sudo). + */ + @Nullable + @Override + @ManagedAttribute(description = "A file containing a password to use when running a program as another user (e.g., with sudo).", currencyTimeLimit = 300) + public final String getPasswordFile() { + return state.getPasswordFile(); + } + + /** + * @param passwordFile + * A file containing a password to use when running a program as + * another user (e.g., with sudo). + */ + @Override + @ManagedAttribute(description = "A file containing a password to use when running a program as another user (e.g., with sudo).", currencyTimeLimit = 300) + public final void setPasswordFile(@Nullable String passwordFile) { + state.setPasswordFile(passwordFile); + reinitFactory(); + } + + /** + * @return The location of the JAR implementing the secure-fork process. + */ + @Nonnull + @Override + @ManagedAttribute(description = "The location of the JAR implementing the secure-fork process.", currencyTimeLimit = 300) + public final String getServerForkerJar() { + return state.getServerForkerJar(); + } + + /** + * @param serverForkerJar + * The location of the JAR implementing the secure-fork process. + */ + @Override + @ManagedAttribute(description = "The location of the JAR implementing the secure-fork process.", currencyTimeLimit = 300) + public final void setServerForkerJar(String forkerJarFilename) { + state.setServerForkerJar(forkerJarFilename); + reinitFactory(); + } + + /** + * @return How many times has a workflow run been spawned by this engine. + * Restarts reset this counter. + */ + @Override + @ManagedMetric(description = "How many times has a workflow run been spawned by this engine.", currencyTimeLimit = 10, metricType = COUNTER, category = "throughput") + public final synchronized int getTotalRuns() { + return totalRuns; + } + + /** + * @return How many checks were done for the worker process the last time a + * spawn was tried. + */ + @Override + @ManagedAttribute(description = "How many checks were done for the worker process the last time a spawn was tried.", currencyTimeLimit = 60) + public abstract int getLastStartupCheckCount(); + + @Nonnull + @Override + @ManagedAttribute(description = "The names of the current runs.", currencyTimeLimit = 5) + public final String[] getCurrentRunNames() { + List<String> names = runDB.listRunNames(); + return names.toArray(new String[names.size()]); + } + + @Override + @ManagedAttribute(description = "What the factory subprocess's main RMI interface is registered as.", currencyTimeLimit = 60) + public abstract String getFactoryProcessName(); + + /** + * @return What was the exit code from the last time the factory subprocess + * was killed? + */ + @Override + @ManagedAttribute(description = "What was the exit code from the last time the factory subprocess was killed?") + public abstract Integer getLastExitCode(); + + /** + * @return The mapping of user names to RMI factory IDs. + */ + @Override + @ManagedAttribute(description = "The mapping of user names to RMI factory IDs.", currencyTimeLimit = 60) + public abstract String[] getFactoryProcessMapping(); + + @Override + @ManagedAttribute(description = "The maximum number of simultaneous operating runs supported by the server.", currencyTimeLimit = 300) + public final void setOperatingLimit(int operatingLimit) { + state.setOperatingLimit(operatingLimit); + } + + @Override + @ManagedAttribute(description = "The maximum number of simultaneous operating runs supported by the server.", currencyTimeLimit = 300) + public final int getOperatingLimit() { + return state.getOperatingLimit(); + } + + /** + * @return A count of the number of runs believed to actually be in the + * {@linkplain uk.org.taverna.server.master.common.Status#Operating + * operating} state. + * @throws Exception + * If anything goes wrong. + */ + @Override + @ManagedMetric(description = "How many workflow runs are currently actually executing.", currencyTimeLimit = 10, metricType = GAUGE, category = "throughput") + public final int getOperatingCount() throws Exception { + return operatingCount(); + } + + @Override + @ManagedAttribute(description="Whether to tell a workflow to generate provenance bundles by default.") + public final void setGenerateProvenance(boolean genProv) { + state.setGenerateProvenance(genProv); + } + + @Override + @ManagedAttribute(description="Whether to tell a workflow to generate provenance bundles by default.") + public final boolean getGenerateProvenance() { + return state.getGenerateProvenance(); + } +}
