http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-webapp/src/main/java/org/taverna/server/master/interfaces/TavernaRun.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/taverna/server/master/interfaces/TavernaRun.java b/taverna-server-webapp/src/main/java/org/taverna/server/master/interfaces/TavernaRun.java new file mode 100644 index 0000000..399164d --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/taverna/server/master/interfaces/TavernaRun.java @@ -0,0 +1,219 @@ +/* + * Copyright (C) 2010-2011 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.master.interfaces; + +import java.io.Serializable; +import java.util.Date; +import java.util.List; + +import org.taverna.server.master.common.Workflow; +import org.taverna.server.master.common.Status; +import org.taverna.server.master.exceptions.BadStateChangeException; +import org.taverna.server.master.exceptions.FilesystemAccessException; +import org.taverna.server.master.exceptions.NoDestroyException; +import org.taverna.server.master.exceptions.UnknownRunException; + +/** + * The interface to a taverna workflow run, or "run" for short. + * + * @author Donal Fellows + */ +public interface TavernaRun extends Serializable { + /** + * @return The identifier of the run. + */ + String getId(); + + /** + * @return What was this run was create to execute. + */ + Workflow getWorkflow(); + + /** + * @return The name of the run. + */ + String getName(); + + /** + * @param name + * The new name of the run. May be truncated. + */ + void setName(String name); + + /** + * @return The name of the Baclava file to use for all inputs, or + * <tt>null</tt> if no Baclava file is set. + */ + String getInputBaclavaFile(); + + /** + * Sets the Baclava file to use for all inputs. This overrides the use of + * individual inputs. + * + * @param filename + * The filename to use. Must not start with a <tt>/</tt> or + * contain any <tt>..</tt> segments. Will be interpreted relative + * to the run's working directory. + * @throws FilesystemAccessException + * If the filename is invalid. + * @throws BadStateChangeException + * If the workflow is not in the {@link Status#Initialized + * Initialized} state. + */ + void setInputBaclavaFile(String filename) throws FilesystemAccessException, + BadStateChangeException; + + /** + * @return The list of input assignments. + */ + List<Input> getInputs(); + + /** + * Create an input assignment. + * + * @param name + * The name of the port that this will be an input for. + * @return The assignment reference. + * @throws BadStateChangeException + * If the workflow is not in the {@link Status#Initialized + * Initialized} state. + */ + Input makeInput(String name) throws BadStateChangeException; + + /** + * @return The file (relative to the working directory) to write the outputs + * of the run to as a Baclava document, or <tt>null</tt> if they are + * to be written to non-Baclava files in a directory called + * <tt>out</tt>. + */ + String getOutputBaclavaFile(); + + /** + * Sets where the output of the run is to be written to. This will cause the + * output to be generated as a Baclava document, rather than a collection of + * individual non-Baclava files in the subdirectory of the working directory + * called <tt>out</tt>. + * + * @param filename + * Where to write the Baclava file (or <tt>null</tt> to cause the + * output to be written to individual files); overwrites any + * previous setting of this value. + * @throws FilesystemAccessException + * If the filename starts with a <tt>/</tt> or contains a + * <tt>..</tt> segment. + * @throws BadStateChangeException + * If the workflow is not in the {@link Status#Initialized + * Initialized} state. + */ + void setOutputBaclavaFile(String filename) + throws FilesystemAccessException, BadStateChangeException; + + /** + * @return When this run will expire, becoming eligible for automated + * deletion. + */ + Date getExpiry(); + + /** + * Set when this run will expire. + * + * @param d + * Expiry time. Deletion will happen some time after that. + */ + void setExpiry(Date d); + + /** + * @return The current status of the run. + */ + Status getStatus(); + + /** + * Set the status of the run, which should cause it to move into the given + * state. This may cause some significant changes. + * + * @param s + * The state to try to change to. + * @return <tt>null</tt>, or a string describing the incomplete state change + * if the operation has internally timed out. + * @throws BadStateChangeException + * If the change to the given state is impossible. + */ + String setStatus(Status s) throws BadStateChangeException; + + /** + * @return Handle to the main working directory of the run. + * @throws FilesystemAccessException + */ + Directory getWorkingDirectory() throws FilesystemAccessException; + + /** + * @return The list of listener instances attached to the run. + */ + List<Listener> getListeners(); + + /** + * Add a listener to the run. + * + * @param listener + * The listener to add. + */ + void addListener(Listener listener); + + /** + * @return The security context structure for this run. + */ + TavernaSecurityContext getSecurityContext(); + + /** + * Kill off this run, removing all resources which it consumes. + * + * @throws NoDestroyException + * If the destruction failed. + */ + void destroy() throws NoDestroyException; + + /** + * @return When this workflow run was created. + */ + Date getCreationTimestamp(); + + /** + * @return When this workflow run was started, or <tt>null</tt> if it has + * never been started. + */ + Date getStartTimestamp(); + + /** + * @return When this workflow run was found to have finished, or + * <tt>null</tt> if it has never finished (either still running or + * never started). + */ + Date getFinishTimestamp(); + + /** + * Test if this run is really there. + * + * <p> + * <i>Implementation note:</i> Used to test communication fabrics, etc. so + * implementations of this interface that do not delegate to another object + * should do nothing. + * + * @throws UnknownRunException + * If things fail. + */ + void ping() throws UnknownRunException; + + /** + * @return whether the run generates provenance data + */ + boolean getGenerateProvenance(); + + /** + * @param generateProvenance + * whether the run generates provenance data + */ + void setGenerateProvenance(boolean generateProvenance); +}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-webapp/src/main/java/org/taverna/server/master/interfaces/TavernaSecurityContext.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/taverna/server/master/interfaces/TavernaSecurityContext.java b/taverna-server-webapp/src/main/java/org/taverna/server/master/interfaces/TavernaSecurityContext.java new file mode 100644 index 0000000..b227bfa --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/taverna/server/master/interfaces/TavernaSecurityContext.java @@ -0,0 +1,213 @@ +/* + * Copyright (C) 2010-2011 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.master.interfaces; + +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.Principal; +import java.util.Set; + +import javax.ws.rs.core.HttpHeaders; +import javax.xml.ws.handler.MessageContext; + +import org.springframework.security.core.context.SecurityContext; +import org.taverna.server.localworker.remote.ImplementationException; +import org.taverna.server.master.common.Credential; +import org.taverna.server.master.common.Trust; +import org.taverna.server.master.exceptions.InvalidCredentialException; +import org.taverna.server.master.utils.UsernamePrincipal; + +/** + * Security context for a workflow run. + * + * @author Donal Fellows + */ +public interface TavernaSecurityContext { + /** + * @return Who owns the security context. + */ + UsernamePrincipal getOwner(); + + /** + * Describe the names of the users (as extracted from their + * {@link Principal} objects) that may destroy the run or manipulate its + * lifetime. + * + * @return The names of the users who may use destroy operations. Read-only. + */ + Set<String> getPermittedDestroyers(); + + /** + * Sets the collection of names of users (as extracted from their + * {@link Principal} objects) that may destroy the run or manipulate its + * lifetime. + * + * @param destroyers + * The names of the users who may use destroy operations. + */ + void setPermittedDestroyers(Set<String> destroyers); + + /** + * Describe the names of the users (as extracted from their + * {@link Principal} objects) that may update the run (including writing to + * files). + * + * @return The names of the users who may use update operations. Read-only. + */ + Set<String> getPermittedUpdaters(); + + /** + * Sets the collection of names of users (as extracted from their + * {@link Principal} objects) that may update the run (including writing to + * its files). + * + * @param updaters + * The names of the users who may use update operations. + */ + void setPermittedUpdaters(Set<String> updaters); + + /** + * Describe the names of the users (as extracted from their + * {@link Principal} objects) that may read from the run (including its + * files). + * + * @return The names of the users who may use read operations. Read-only. + */ + Set<String> getPermittedReaders(); + + /** + * Sets the collection of names of users (as extracted from their + * {@link Principal} objects) that may read from the run (including its + * files). + * + * @param readers + * The names of the users who may use read operations. + */ + void setPermittedReaders(Set<String> readers); + + /** + * @return The credentials owned by the user. Never <tt>null</tt>. + */ + Credential[] getCredentials(); + + /** + * Add a credential to the owned set or replaces the old version with the + * new one. + * + * @param toAdd + * The credential to add. + */ + void addCredential(Credential toAdd); + + /** + * Remove a credential from the owned set. It's not a failure to remove + * something that isn't in the set. + * + * @param toDelete + * The credential to remove. + */ + void deleteCredential(Credential toDelete); + + /** + * Tests if the credential is valid. This includes testing whether the + * underlying credential file exists and can be unlocked by the password in + * the {@link Credential} object. + * + * @param c + * The credential object to validate. + * @throws InvalidCredentialException + * If it is invalid. + */ + void validateCredential(Credential c) throws InvalidCredentialException; + + /** + * @return The identities trusted by the user. Never <tt>null</tt>. + */ + Trust[] getTrusted(); + + /** + * Add an identity to the trusted set. + * + * @param toAdd + * The identity to add. + */ + void addTrusted(Trust toAdd); + + /** + * Remove an identity from the trusted set. It's not a failure to remove + * something that isn't in the set. + * + * @param toDelete + * The identity to remove. + */ + void deleteTrusted(Trust toDelete); + + /** + * Tests if the trusted identity descriptor is valid. This includes checking + * whether the underlying trusted identity file exists. + * + * @param t + * The trusted identity descriptor to check. + * @throws InvalidCredentialException + * If it is invalid. + */ + void validateTrusted(Trust t) throws InvalidCredentialException; + + /** + * Establish the security context from how the owning workflow run was + * created. In particular, this gives an opportunity for boot-strapping + * things with any delegateable credentials. + * + * @param securityContext + * The security context associated with the request that caused + * the workflow to be created. + * @throws Exception + * If anything goes wrong. + */ + void initializeSecurityFromContext(SecurityContext securityContext) + throws Exception; + + /** + * Establish the security context from how the owning workflow run was + * created. In particular, this gives an opportunity for boot-strapping + * things with any delegateable credentials. + * + * @param context + * The full information about the request that caused the + * workflow to be created. + */ + void initializeSecurityFromSOAPContext(MessageContext context); + + /** + * Establish the security context from how the owning workflow run was + * created. In particular, this gives an opportunity for boot-strapping + * things with any delegateable credentials. + * + * @param headers + * The full information about the request that caused the + * workflow to be created. + */ + void initializeSecurityFromRESTContext(HttpHeaders headers); + + /** + * Transfer the security context to the remote system. + * + * @throws IOException + * If the communication fails. + * @throws GeneralSecurityException + * If the assembly of the context fails. + * @throws ImplementationException + * If the local worker has problems with creating the realized + * security context. + */ + void conveySecurity() throws GeneralSecurityException, IOException, + ImplementationException; + + /** + * @return The factory that created this security context. + */ + SecurityContextFactory getFactory(); +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-webapp/src/main/java/org/taverna/server/master/interfaces/UriBuilderFactory.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/taverna/server/master/interfaces/UriBuilderFactory.java b/taverna-server-webapp/src/main/java/org/taverna/server/master/interfaces/UriBuilderFactory.java new file mode 100644 index 0000000..e5cd02c --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/taverna/server/master/interfaces/UriBuilderFactory.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2010-2013 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.master.interfaces; + +import java.net.URI; + +import javax.ws.rs.core.UriBuilder; + +/** + * How to manufacture URIs to workflow runs. + * + * @author Donal Fellows + */ +public interface UriBuilderFactory { + /** + * Given a run, get a factory for RESTful URIs to resources associated + * with it. + * + * @param run + * The run in question. + * @return The {@link URI} factory. + */ + UriBuilder getRunUriBuilder(TavernaRun run); + + /** + * @return a URI factory that is preconfigured to point to the base of + * the webapp. + */ + UriBuilder getBaseUriBuilder(); + + /** + * Resolves a URI with respect to the base URI of the factory. + * + * @param uri + * The URI to resolve, or <tt>null</tt>. + * @return The resolved URI, or <tt>null</tt> if <b>uri</b> is + * <tt>null</tt>. + */ + String resolve(String uri); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-webapp/src/main/java/org/taverna/server/master/interfaces/package-info.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/taverna/server/master/interfaces/package-info.java b/taverna-server-webapp/src/main/java/org/taverna/server/master/interfaces/package-info.java new file mode 100644 index 0000000..cfbbe79 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/taverna/server/master/interfaces/package-info.java @@ -0,0 +1,10 @@ +/* + * Copyright (C) 2010-2011 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +/** + * Interfaces to the main worker classes that provide the magical power + * that drives the webapp front-end. + */ +package org.taverna.server.master.interfaces; http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/AbstractRemoteRunFactory.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/AbstractRemoteRunFactory.java b/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/AbstractRemoteRunFactory.java new file mode 100644 index 0000000..fc7f881 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/AbstractRemoteRunFactory.java @@ -0,0 +1,440 @@ +/* + * Copyright (C) 2010-2013 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.master.localworker; + +import static java.lang.System.getSecurityManager; +import static java.lang.System.setProperty; +import static java.lang.System.setSecurityManager; +import static java.rmi.registry.LocateRegistry.createRegistry; +import static java.rmi.registry.LocateRegistry.getRegistry; +import static java.rmi.registry.Registry.REGISTRY_PORT; +import static java.util.UUID.randomUUID; +import static org.taverna.server.master.TavernaServer.JMX_ROOT; +import static org.taverna.server.master.rest.TavernaServerRunREST.PathNames.DIR; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.net.URI; +import java.net.URL; +import java.rmi.MarshalledObject; +import java.rmi.RMISecurityManager; +import java.rmi.RemoteException; +import java.rmi.registry.LocateRegistry; +import java.rmi.registry.Registry; +import java.rmi.server.UnicastRemoteObject; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.UUID; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.Resource; +import javax.xml.bind.JAXBException; + +import org.apache.commons.io.IOUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.jmx.export.annotation.ManagedResource; +import org.taverna.server.localworker.remote.RemoteRunFactory; +import org.taverna.server.localworker.remote.RemoteSingleRun; +import org.taverna.server.localworker.server.UsageRecordReceiver; +import org.taverna.server.master.common.Workflow; +import org.taverna.server.master.exceptions.NoCreateException; +import org.taverna.server.master.exceptions.NoListenerException; +import org.taverna.server.master.factories.ListenerFactory; +import org.taverna.server.master.factories.RunFactory; +import org.taverna.server.master.interaction.InteractionFeedSupport; +import org.taverna.server.master.interfaces.Listener; +import org.taverna.server.master.interfaces.SecurityContextFactory; +import org.taverna.server.master.interfaces.TavernaRun; +import org.taverna.server.master.interfaces.UriBuilderFactory; +import org.taverna.server.master.notification.atom.EventDAO; +import org.taverna.server.master.usage.UsageRecordRecorder; +import org.taverna.server.master.utils.UsernamePrincipal; +import org.taverna.server.master.worker.FactoryBean; +import org.taverna.server.master.worker.RemoteRunDelegate; +import org.taverna.server.master.worker.RunFactoryConfiguration; + +import uk.org.taverna.scufl2.api.io.WriterException; + +/** + * Bridge to remote runs via RMI. + * + * @author Donal Fellows + */ +@ManagedResource(objectName = JMX_ROOT + "Factory", description = "The factory for runs") +public abstract class AbstractRemoteRunFactory extends RunFactoryConfiguration + implements ListenerFactory, RunFactory, FactoryBean { + /** + * Whether to apply stronger limitations than normal to RMI. It is + * recommended that this be true! + */ + @Value("${rmi.localhostOnly}") + private boolean rmiLocalhostOnly; + /** The interaction host name. */ + private String interhost; + /** The interaction port number. */ + private String interport; + private Process registryProcess; + /** + * The interaction WebDAV location. Will be resolved before being passed to + * the back-end. + */ + private String interwebdav; + /** + * The interaction ATOM feed location. Will be resolved before being passed + * to the back-end. + */ + private String interfeed; + /** Used for doing URI resolution. */ + @Resource(name = "webapp") + private UriBuilderFactory baseurifactory; + @Autowired + private InteractionFeedSupport interactionFeedSupport; + + @Value("${taverna.interaction.host}") + void setInteractionHost(String host) { + if (host != null && host.equals("none")) + host = null; + interhost = host; + } + + @Value("${taverna.interaction.port}") + void setInteractionPort(String port) { + if (port != null && port.equals("none")) + port = null; + interport = port; + } + + @Value("${taverna.interaction.webdav_path}") + void setInteractionWebdav(String webdav) { + if (webdav != null && webdav.equals("none")) + webdav = null; + interwebdav = webdav; + } + + @Value("${taverna.interaction.feed_path}") + void setInteractionFeed(String feed) { + if (feed != null && feed.equals("none")) + feed = null; + interfeed = feed; + } + + @Override + protected void reinitRegistry() { + registry = null; + if (registryProcess != null) { + registryProcess.destroy(); + registryProcess = null; + } + } + + protected void initInteractionDetails(RemoteRunFactory factory) + throws RemoteException { + if (interhost != null) { + String feed = baseurifactory.resolve(interfeed); + String webdav = baseurifactory.resolve(interwebdav); + factory.setInteractionServiceDetails(interhost, interport, webdav, + feed); + } + } + + protected static final Process launchSubprocess(ProcessBuilder b) + throws IOException { + Thread t = Thread.currentThread(); + ClassLoader ccl = t.getContextClassLoader(); + try { + t.setContextClassLoader(null); + return b.start(); + } finally { + t.setContextClassLoader(ccl); + } + } + + /** Get a handle to a new instance of the RMI registry. */ + private Registry makeRegistry(int port) throws RemoteException { + ProcessBuilder p = new ProcessBuilder(getJavaBinary()); + p.command().add("-jar"); + p.command().add(getRmiRegistryJar()); + p.command().add(Integer.toString(port)); + p.command().add(Boolean.toString(rmiLocalhostOnly)); + try { + Process proc = launchSubprocess(p); + Thread.sleep(getSleepTime()); + try { + if (proc.exitValue() == 0) + return null; + String error = IOUtils.toString(proc.getErrorStream()); + throw new RemoteException(error); + } catch (IllegalThreadStateException ise) { + // Still running! + } + try (ObjectInputStream ois = new ObjectInputStream( + proc.getInputStream())) { + @SuppressWarnings("unchecked") + Registry r = ((MarshalledObject<Registry>) ois.readObject()) + .get(); + registryProcess = proc; + return r; + } + } catch (RemoteException e) { + throw e; + } catch (ClassNotFoundException e) { + throw new RemoteException("unexpected registry type", e); + } catch (IOException e) { + throw new RemoteException("unexpected IO problem with registry", e); + } catch (InterruptedException e) { + throw new RemoteException("unexpected interrupt"); + } + } + + /** + * @return A handle to the current RMI registry. + */ + protected Registry getTheRegistry() { + try { + if (registry != null) { + registry.list(); + return registry; + } + } catch (RemoteException e) { + log.warn("non-functioning existing registry handle", e); + registry = null; + } + try { + registry = getRegistry(getRegistryHost(), getRegistryPort()); + registry.list(); + return registry; + } catch (RemoteException e) { + log.warn("Failed to get working RMI registry handle."); + registry = null; + log.warn("Will build new registry, " + + "but service restart ability is at risk."); + try { + registry = makeRegistry(getRegistryPort()); + registry.list(); + return registry; + } catch (RemoteException e2) { + log.error( + "failed to create local working RMI registry on port " + + getRegistryPort(), e2); + log.info("original connection exception", e); + } + } + try { + registry = getRegistry(getRegistryHost(), REGISTRY_PORT); + registry.list(); + return registry; + } catch (RemoteException e) { + log.warn("Failed to get working RMI registry handle on backup port."); + try { + registry = makeRegistry(REGISTRY_PORT); + registry.list(); + return registry; + } catch (RemoteException e2) { + log.fatal( + "totally failed to get registry handle, even on fallback!", + e2); + log.info("original connection exception", e); + registry = null; + throw new RuntimeException("No RMI Registry Available"); + } + } + } + + private Registry registry; + /** + * The name of the resource that describes the default security policy to + * install. + */ + public static final String SECURITY_POLICY_FILE = "security.policy"; + private SecurityContextFactory securityFactory; + UsageRecordRecorder usageRecordSink; + private EventDAO masterEventFeed; + + @Autowired(required = true) + void setSecurityContextFactory(SecurityContextFactory factory) { + this.securityFactory = factory; + } + + @Autowired(required = true) + void setMasterEventFeed(EventDAO masterEventFeed) { + this.masterEventFeed = masterEventFeed; + } + + @Autowired(required = true) + void setUsageRecordSink(UsageRecordRecorder usageRecordSink) { + this.usageRecordSink = usageRecordSink; + } + + /** + * Configures the Java security model. Not currently used, as it is + * viciously difficult to get right! + */ + @SuppressWarnings("unused") + private static void installSecurityManager() { + if (getSecurityManager() == null) { + setProperty("java.security.policy", AbstractRemoteRunFactory.class + .getClassLoader().getResource(SECURITY_POLICY_FILE) + .toExternalForm()); + setSecurityManager(new RMISecurityManager()); + } + } + + // static { + // installSecurityManager(); + // } + + /** + * Set up the run expiry management engine. + * + * @throws JAXBException + */ + public AbstractRemoteRunFactory() throws JAXBException { + try { + registry = LocateRegistry.getRegistry(); + registry.list(); + } catch (RemoteException e) { + log.warn("Failed to get working RMI registry handle."); + log.warn("Will build new registry, but service restart ability is at risk."); + try { + registry = createRegistry(REGISTRY_PORT); + registry.list(); + } catch (RemoteException e2) { + log.error("failed to create working RMI registry", e2); + log.info("original connection exception", e); + } + } + } + + @Override + public List<String> getSupportedListenerTypes() { + try { + RemoteRunDelegate rrd = runDB.pickArbitraryRun(); + if (rrd != null) + return rrd.getListenerTypes(); + log.warn("no remote runs; no listener types"); + } catch (Exception e) { + log.warn("failed to get list of listener types", e); + } + return new ArrayList<>(); + } + + @Override + public Listener makeListener(TavernaRun run, String listenerType, + String configuration) throws NoListenerException { + if (run instanceof RemoteRunDelegate) + return ((RemoteRunDelegate) run).makeListener(listenerType, + configuration); + throw new NoListenerException("unexpected run type: " + run.getClass()); + } + + @Override + public TavernaRun create(UsernamePrincipal creator, Workflow workflow) + throws NoCreateException { + try { + Date now = new Date(); + UUID id = randomUUID(); + RemoteSingleRun rsr = getRealRun(creator, workflow, id); + RemoteRunDelegate run = new RemoteRunDelegate(now, workflow, rsr, + state.getDefaultLifetime(), runDB, id, + state.getGenerateProvenance(), this); + run.setSecurityContext(securityFactory.create(run, creator)); + @Nonnull + URI feed = interactionFeedSupport.getFeedURI(run); + @Nonnull + URL feedUrl = feed.toURL(); + @Nonnull + URL webdavUrl = baseurifactory.getRunUriBuilder(run) + .path(DIR + "/interactions").build().toURL(); + @Nullable + URL pub = interactionFeedSupport.getLocalFeedBase(feed); + rsr.setInteractionServiceDetails(feedUrl, webdavUrl, pub); + return run; + } catch (NoCreateException e) { + log.warn("failed to build run instance", e); + throw e; + } catch (Exception e) { + log.warn("failed to build run instance", e); + throw new NoCreateException("failed to build run instance", e); + } + } + + /** + * Gets the RMI connector for a new run. + * + * @param creator + * Who is creating the workflow run. + * @param workflow + * What workflow are they instantiating. + * @param id + * The identity token for the run, newly minted. + * @return The remote interface to the run. + * @throws Exception + * Just about anything can go wrong... + */ + protected abstract RemoteSingleRun getRealRun(UsernamePrincipal creator, + Workflow workflow, UUID id) throws Exception; + + /** + * How to convert a wrapped workflow into XML. + * + * @param workflow + * The wrapped workflow. + * @return The XML version of the document. + * @throws JAXBException + * If serialization fails. + */ + protected byte[] serializeWorkflow(Workflow workflow) throws JAXBException { + try { + return workflow.getScufl2Bytes(); + } catch (IOException e) { + throw new JAXBException("problem converting to scufl2", e); + } catch (WriterException e) { + throw new JAXBException("problem converting to scufl2", e); + } + } + + private void acceptUsageRecord(String usageRecord) { + if (usageRecordSink != null) + usageRecordSink.storeUsageRecord(usageRecord); + runDB.checkForFinishNow(); + } + + /** + * Make a Remote object that can act as a consumer for usage records. + * + * @param creator + * + * @return The receiver, or <tt>null</tt> if the construction fails. + */ + protected UsageRecordReceiver makeURReciver(UsernamePrincipal creator) { + try { + @SuppressWarnings("serial") + class URReceiver extends UnicastRemoteObject implements + UsageRecordReceiver { + public URReceiver() throws RemoteException { + super(); + } + + @Override + public void acceptUsageRecord(String usageRecord) { + AbstractRemoteRunFactory.this.acceptUsageRecord(usageRecord); + } + } + return new URReceiver(); + } catch (RemoteException e) { + log.warn("failed to build usage record receiver", e); + return null; + } + } + + @Override + public EventDAO getMasterEventFeed() { + return masterEventFeed; + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/ForkRunFactory.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/ForkRunFactory.java b/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/ForkRunFactory.java new file mode 100644 index 0000000..b67e121 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/ForkRunFactory.java @@ -0,0 +1,323 @@ +/* + * Copyright (C) 2010-2011 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.master.localworker; + +import static java.lang.System.getProperty; +import static java.lang.Thread.sleep; +import static java.util.Arrays.asList; +import static java.util.Calendar.SECOND; +import static java.util.UUID.randomUUID; +import static org.taverna.server.master.TavernaServer.JMX_ROOT; + +import java.io.File; +import java.rmi.ConnectException; +import java.rmi.ConnectIOException; +import java.rmi.NotBoundException; +import java.rmi.RemoteException; +import java.util.Calendar; +import java.util.UUID; + +import javax.annotation.Nonnull; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.xml.bind.JAXBException; + +import org.springframework.jmx.export.annotation.ManagedAttribute; +import org.springframework.jmx.export.annotation.ManagedResource; +import org.taverna.server.localworker.remote.RemoteRunFactory; +import org.taverna.server.localworker.remote.RemoteSingleRun; +import org.taverna.server.master.common.Workflow; +import org.taverna.server.master.exceptions.NoCreateException; +import org.taverna.server.master.factories.ConfigurableRunFactory; +import org.taverna.server.master.utils.UsernamePrincipal; + +/** + * A simple factory for workflow runs that forks runs from a subprocess. + * + * @author Donal Fellows + */ +@ManagedResource(objectName = JMX_ROOT + "RunFactory", description = "The factory for simple singleton forked run.") +public class ForkRunFactory extends AbstractRemoteRunFactory implements + ConfigurableRunFactory { + private int lastStartupCheckCount; + private Integer lastExitCode; + private RemoteRunFactory factory; + private Process factoryProcess; + private String factoryProcessName; + + /** + * Create a factory for remote runs that works by forking off a subprocess. + * + * @throws JAXBException + * Shouldn't happen. + */ + public ForkRunFactory() throws JAXBException { + } + + @PostConstruct + protected void initRegistry() { + log.info("waiting for availability of default RMI registry"); + getTheRegistry(); + } + + @Override + protected void reinitFactory() { + boolean makeFactory = factory != null; + killFactory(); + try { + if (makeFactory) + initFactory(); + } catch (Exception e) { + log.fatal("failed to make connection to remote run factory", e); + } + } + + private RemoteRunFactory getFactory() throws RemoteException { + try { + initFactory(); + } catch (RemoteException e) { + throw e; + } catch (Exception e) { + throw new RemoteException("problem constructing factory", e); + } + return factory; + } + + /** + * @return How many checks were done for the worker process the last time a + * spawn was tried. + */ + @ManagedAttribute(description = "How many checks were done for the worker process the last time a spawn was tried.", currencyTimeLimit = 60) + @Override + public int getLastStartupCheckCount() { + return lastStartupCheckCount; + } + + /** + * @return What was the exit code from the last time the factory subprocess + * was killed? + */ + @ManagedAttribute(description = "What was the exit code from the last time the factory subprocess was killed?") + @Override + public Integer getLastExitCode() { + return lastExitCode; + } + + /** + * @return What the factory subprocess's main RMI interface is registered + * as. + */ + @ManagedAttribute(description = "What the factory subprocess's main RMI interface is registered as.", currencyTimeLimit = 60) + @Override + public String getFactoryProcessName() { + return factoryProcessName; + } + + /** + * Makes the subprocess that manufactures runs. + * + * @throws Exception + * If anything goes wrong. + */ + public void initFactory() throws Exception { + if (factory != null) + return; + // Generate the arguments to use when spawning the subprocess + factoryProcessName = state.getFactoryProcessNamePrefix() + randomUUID(); + ProcessBuilder p = new ProcessBuilder(getJavaBinary()); + p.command().add("-jar"); + p.command().add(getServerWorkerJar()); + if (getExecuteWorkflowScript() == null) + log.fatal("no execute workflow script"); + p.command().add(getExecuteWorkflowScript()); + p.command().addAll(asList(getExtraArguments())); + p.command().add(factoryProcessName); + p.redirectErrorStream(true); + p.directory(new File(getProperty("javax.servlet.context.tempdir", + getProperty("java.io.tmpdir")))); + + // Spawn the subprocess + log.info("about to create subprocess: " + p.command()); + + factoryProcess = launchSubprocess(p); + outlog = new StreamLogger("FactoryStdout", factoryProcess.getInputStream()) { + @Override + protected void write(String msg) { + log.info(msg); + } + }; + errlog = new StreamLogger("FactoryStderr", factoryProcess.getErrorStream()) { + @Override + protected void write(String msg) { + log.info(msg); + } + }; + + // Wait for the subprocess to register itself in the RMI registry + Calendar deadline = Calendar.getInstance(); + deadline.add(SECOND, state.getWaitSeconds()); + Exception lastException = null; + lastStartupCheckCount = 0; + while (deadline.after(Calendar.getInstance())) { + try { + sleep(state.getSleepMS()); + lastStartupCheckCount++; + factory = getRemoteFactoryHandle(factoryProcessName); + initInteractionDetails(factory); + return; + } catch (InterruptedException ie) { + continue; + } catch (NotBoundException nbe) { + lastException = nbe; + log.info("resource \"" + factoryProcessName + + "\" not yet registered..."); + continue; + } catch (RemoteException re) { + // Unpack a remote exception if we can + lastException = re; + try { + if (re.getCause() != null) + lastException = (Exception) re.getCause(); + } catch (Throwable t) { + // Ignore! + } + } catch (RuntimeException e) { + lastException = e; + } + } + if (lastException == null) + lastException = new InterruptedException(); + throw lastException; + } + + private StreamLogger outlog, errlog; + + private void stopLoggers() { + if (outlog != null) + outlog.stop(); + outlog = null; + if (errlog != null) + errlog.stop(); + errlog = null; + } + + private RemoteRunFactory getRemoteFactoryHandle(String name) + throws RemoteException, NotBoundException { + log.info("about to look up resource called " + name); + try { + // Validate registry connection first + getTheRegistry().list(); + } catch (ConnectException | ConnectIOException e) { + log.warn("connection problems with registry", e); + } + RemoteRunFactory rrf = (RemoteRunFactory) getTheRegistry().lookup(name); + log.info("successfully connected to factory subprocess " + + factoryProcessName); + return rrf; + } + + /** + * Destroys the subprocess that manufactures runs. + */ + @PreDestroy + public void killFactory() { + if (factory != null) { + log.info("requesting shutdown of " + factoryProcessName); + try { + factory.shutdown(); + sleep(700); + } catch (RemoteException e) { + log.warn(factoryProcessName + " failed to shut down nicely", e); + } catch (InterruptedException e) { + if (log.isDebugEnabled()) + log.debug("interrupted during wait after asking " + + factoryProcessName + " to shut down", e); + } finally { + factory = null; + } + } + + if (factoryProcess != null) { + int code = -1; + try { + lastExitCode = code = factoryProcess.exitValue(); + log.info(factoryProcessName + " already dead?"); + } catch (RuntimeException e) { + log.info("trying to force death of " + factoryProcessName); + try { + factoryProcess.destroy(); + sleep(350); // takes a little time, even normally + lastExitCode = code = factoryProcess.exitValue(); + } catch (Exception e2) { + code = -1; + } + } finally { + factoryProcess = null; + stopLoggers(); + } + if (code > 128) { + log.info(factoryProcessName + " died with signal=" + + (code - 128)); + } else if (code >= 0) { + log.info(factoryProcessName + " process killed: code=" + code); + } else { + log.warn(factoryProcessName + " not yet dead"); + } + } + } + + /** + * The real core of the run builder, factored out from its reliability + * support. + * + * @param creator + * Who created this workflow? + * @param wf + * The serialized workflow. + * @return The remote handle of the workflow run. + * @throws RemoteException + * If anything fails (communications error, etc.) + */ + private RemoteSingleRun getRealRun(@Nonnull UsernamePrincipal creator, + @Nonnull byte[] wf, UUID id) throws RemoteException { + @Nonnull + String globaluser = "Unknown Person"; + if (creator != null) + globaluser = creator.getName(); + RemoteSingleRun rsr = getFactory().make(wf, globaluser, + makeURReciver(creator), id); + incrementRunCount(); + return rsr; + } + + @Override + protected RemoteSingleRun getRealRun(UsernamePrincipal creator, + Workflow workflow, UUID id) throws Exception { + @Nonnull + byte[] wf = serializeWorkflow(workflow); + for (int i = 0; i < 3; i++) { + initFactory(); + try { + return getRealRun(creator, wf, id); + } catch (ConnectException | ConnectIOException e) { + // factory was lost; try to recreate + } + killFactory(); + } + throw new NoCreateException("total failure to connect to factory " + + factoryProcessName + "despite attempting restart"); + } + + @Override + public String[] getFactoryProcessMapping() { + return new String[0]; + } + + @Override + protected int operatingCount() throws Exception { + return getFactory().countOperatingRuns(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/IdAwareForkRunFactory.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/IdAwareForkRunFactory.java b/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/IdAwareForkRunFactory.java new file mode 100644 index 0000000..e449373 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/IdAwareForkRunFactory.java @@ -0,0 +1,516 @@ +/* + * Copyright (C) 2010-2012 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.master.localworker; + +import static java.lang.System.getProperty; +import static java.lang.Thread.sleep; +import static java.util.Arrays.asList; +import static java.util.Calendar.SECOND; +import static java.util.UUID.randomUUID; +import static org.taverna.server.master.TavernaServer.JMX_ROOT; +import static org.taverna.server.master.localworker.AbstractRemoteRunFactory.launchSubprocess; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.rmi.ConnectException; +import java.rmi.ConnectIOException; +import java.rmi.NotBoundException; +import java.rmi.RemoteException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import javax.annotation.Nonnull; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.xml.bind.JAXBException; + +import org.apache.commons.logging.Log; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.annotation.Order; +import org.springframework.jmx.export.annotation.ManagedAttribute; +import org.springframework.jmx.export.annotation.ManagedResource; +import org.taverna.server.localworker.remote.RemoteRunFactory; +import org.taverna.server.localworker.remote.RemoteSingleRun; +import org.taverna.server.master.common.Workflow; +import org.taverna.server.master.exceptions.NoCreateException; +import org.taverna.server.master.factories.ConfigurableRunFactory; +import org.taverna.server.master.interfaces.LocalIdentityMapper; +import org.taverna.server.master.utils.UsernamePrincipal; + +/** + * A simple factory for workflow runs that forks runs from a subprocess. + * + * @author Donal Fellows + */ +@ManagedResource(objectName = JMX_ROOT + "RunFactory", description = "The factory for a user-specific forked run.") +public class IdAwareForkRunFactory extends AbstractRemoteRunFactory implements + ConfigurableRunFactory { + private MetaFactory forker; + private Map<String, RemoteRunFactory> factory; + private Map<String, String> factoryProcessName; + + /** + * Create a factory for remote runs that works by forking off a subprocess. + * + * @throws JAXBException + * Shouldn't happen. + */ + public IdAwareForkRunFactory() throws JAXBException { + factory = new HashMap<>(); + factoryProcessName = new HashMap<>(); + } + + @Override + protected void reinitFactory() { + boolean makeForker = forker != null; + try { + killForker(); + } catch (Exception e) { + log.warn("exception when killing secure-fork process", e); + } + try { + if (makeForker) + initMetaFactory(); + } catch (Exception e) { + log.fatal("failed to make secure-fork process", e); + } + } + + /** + * @return How many checks were done for the worker process the last time a + * spawn was tried. + */ + @Override + @ManagedAttribute(description = "How many checks were done for the worker process the last time a spawn was tried.", currencyTimeLimit = 60) + public int getLastStartupCheckCount() { + return forker == null ? 0 : forker.lastStartupCheckCount(); + } + + /** + * @return What was the exit code from the last time the factory subprocess + * was killed? + */ + @Override + @ManagedAttribute(description = "What was the exit code from the last time the factory subprocess was killed?") + public Integer getLastExitCode() { + return forker == null ? null : forker.lastExitCode(); + } + + /** + * @return The mapping of user names to RMI factory IDs. + */ + @Override + @ManagedAttribute(description = "The mapping of user names to RMI factory IDs.", currencyTimeLimit = 60) + public String[] getFactoryProcessMapping() { + ArrayList<String> result = new ArrayList<>(); + ArrayList<String> keys = new ArrayList<>(factoryProcessName.keySet()); + String[] ks = keys.toArray(new String[keys.size()]); + Arrays.sort(ks); + for (String k : ks) { + result.add(k); + result.add(factoryProcessName.get(k)); + } + return result.toArray(new String[result.size()]); + } + + /** + * How construction of factories is actually done. + * + * @author Donal Fellows + */ + public interface MetaFactory { + /** + * Make a factory for the given user. + * + * @param username + * Who to make it for. + * @return Handle of the factory. + * @throws Exception + * If anything goes wrong. + */ + RemoteRunFactory make(String username) throws Exception; + + /** + * Shut down the meta-factory. It is not defined whether factories + * created by it are also shut down at the same time. + * + * @throws IOException + * If something goes wrong when communicating with the + * meta-factory. + * @throws InterruptedException + * If something stops us waiting for the shut down to + * happen. + */ + void close() throws IOException, InterruptedException; + + int lastStartupCheckCount(); + + Integer lastExitCode(); + } + + void registerFactory(String username, String fpn, RemoteRunFactory f) { + factoryProcessName.put(username, fpn); + factory.put(username, f); + } + + /** + * Makes the connection to the meta-factory that makes factories. + * + * @throws IOException + * If the connection fails. + */ + @PostConstruct + void initMetaFactory() throws IOException { + log.info("waiting for availability of default RMI registry"); + getTheRegistry(); + log.info("constructing secure fork subprocess"); + forker = new SecureFork(this, state, log); + } + + private void killForker() throws IOException, InterruptedException { + try { + if (forker != null) + forker.close(); + } finally { + forker = null; + } + } + + /** + * Makes the subprocess that manufactures runs. + * + * @throws Exception + * If anything goes wrong. + */ + private void initFactory(String username) throws Exception { + if (factory.containsKey(username)) + return; + if (forker == null) + initMetaFactory(); + forker.make(username); + } + + /** + * Destroys the subprocess that manufactures runs. + */ + @PreDestroy + public void killFactories() { + if (!factory.isEmpty()) { + Iterator<String> keys = factory.keySet().iterator(); + while (keys.hasNext()) { + String key = keys.next(); + log.info("requesting shutdown of " + + factoryProcessName.get(key)); + try { + factory.get(key).shutdown(); + } catch (RemoteException e) { + log.warn(factoryProcessName.get(key) + + " failed to shut down nicely", e); + } finally { + keys.remove(); + factoryProcessName.remove(key); + } + } + try { + sleep(700); + } catch (InterruptedException e) { + if (log.isDebugEnabled()) + log.debug("interrupted during wait after " + + "asking factories to shut down", e); + } + } + + try { + killForker(); + } catch (Exception e) { + if (log.isDebugEnabled()) + log.debug("exception in shutdown of secure-fork process", e); + } + } + + @Override + protected void finalize() throws Throwable { + killFactories(); + super.finalize(); + } + + @Autowired + public void setIdMapper(LocalIdentityMapper mapper) { + this.mapper = mapper; + } + + private LocalIdentityMapper mapper; + + /** + * The real core of the run builder, factored out from its reliability + * support. + * + * @param creator + * Who created this workflow? + * @param username + * What user account is this workflow to be executed in? + * @param wf + * The serialized workflow. + * @return The remote handle of the workflow run. + * @throws RemoteException + * If anything fails (communications error, etc.) + */ + private RemoteSingleRun getRealRun(@Nonnull UsernamePrincipal creator, + @Nonnull String username, @Nonnull byte[] wf, UUID id) + throws RemoteException { + String globaluser = "Unknown Person"; + if (creator != null) + globaluser = creator.getName(); + RemoteSingleRun rsr = factory.get(username).make(wf, globaluser, + makeURReciver(creator), id); + incrementRunCount(); + return rsr; + } + + @Override + protected RemoteSingleRun getRealRun(UsernamePrincipal creator, + Workflow workflow, UUID id) throws Exception { + byte[] wf = serializeWorkflow(workflow); + String username = mapper == null ? null : mapper + .getUsernameForPrincipal(creator); + if (username == null) + throw new Exception("cannot determine who to run workflow as; " + + "local identity mapper returned null"); + for (int i = 0; i < 3; i++) { + if (!factory.containsKey(username)) + initFactory(username); + try { + return getRealRun(creator, username, wf, id); + } catch (ConnectException | ConnectIOException e) { + // factory was lost; try to recreate + } + factory.remove(username); + } + throw new NoCreateException("total failure to connect to factory " + + factoryProcessName + "despite attempting restart"); + } + + @Value("${secureForkPasswordFile}") + @Order(20) + public void setPasswordSource(String passwordSource) { + if (passwordSource == null || passwordSource.isEmpty() + || passwordSource.startsWith("${")) + state.setDefaultPasswordFile(null); + else + state.setDefaultPasswordFile(passwordSource); + if (state.getPasswordFile() == null) + log.info("assuming password-free forking enabled"); + else + log.info("configured secureForkPasswordFile from context as " + + state.getPasswordFile()); + } + + @Override + public String getFactoryProcessName() { + return "<PROPERTY-NOT-SUPPORTED>"; + } + + @Override + protected int operatingCount() throws Exception { + int total = 0; + for (RemoteRunFactory rrf : factory.values()) + total += rrf.countOperatingRuns(); + return total; + } +} + +/** + * The connector that handles the secure fork process itself. + * + * @author Donal Fellows + */ +class SecureFork implements IdAwareForkRunFactory.MetaFactory { + private IdAwareForkRunFactory main; + private Process process; + private PrintWriter channel; + private int lastStartupCheckCount; + private Integer lastExitCode; + private Log log; + private LocalWorkerState state; + private StreamLogger out, err; + + /** + * Construct the command to run the meta-factory process. + * + * @param args + * The live list of arguments to pass. + */ + public void initFactoryArgs(List<String> args) { + args.add(main.getJavaBinary()); + String pwf = main.getPasswordFile(); + if (pwf != null) { + args.add("-Dpassword.file=" + pwf); + } + args.add("-jar"); + args.add(main.getServerForkerJar()); + args.add(main.getJavaBinary()); + args.add("-jar"); + args.add(main.getServerWorkerJar()); + if (main.getExecuteWorkflowScript() == null) + log.fatal("no execute workflow script"); + args.add(main.getExecuteWorkflowScript()); + args.addAll(asList(main.getExtraArguments())); + } + + SecureFork(IdAwareForkRunFactory main, LocalWorkerState state, Log log) + throws IOException { + this.main = main; + this.log = log; + this.state = state; + ProcessBuilder p = new ProcessBuilder(); + initFactoryArgs(p.command()); + p.redirectErrorStream(true); + p.directory(new File(getProperty("javax.servlet.context.tempdir", + getProperty("java.io.tmpdir")))); + + // Spawn the subprocess + log.info("about to create subprocess: " + p.command()); + log.info("subprocess directory: " + p.directory()); + process = launchSubprocess(p); + channel = new PrintWriter(new BufferedWriter(new OutputStreamWriter( + process.getOutputStream())), true); + // Log the responses + out = new StreamLogger("ForkedStdout", process.getInputStream()) { + @Override + protected void write(String msg) { + log.info(msg); + } + }; + err = new StreamLogger("ForkedStderr", process.getErrorStream()) { + @Override + protected void write(String msg) { + log.info(msg); + } + }; + } + + @Override + public void close() throws IOException, InterruptedException { + try { + if (process != null) { + log.info("about to close down subprocess"); + channel.close(); + int code = -1; + try { + try { + code = process.exitValue(); + log.info("secure-fork process already dead?"); + } catch (IllegalThreadStateException e) { + try { + code = process.waitFor(); + } catch (InterruptedException e1) { + log.info("interrupted waiting for natural death of secure-fork process?!"); + process.destroy(); + code = process.waitFor(); + } + } + } finally { + lastExitCode = code; + if (code > 128) { + log.info("secure-fork process died with signal=" + + (code - 128)); + } else if (code >= 0) { + log.info("secure-fork process killed: code=" + code); + } else { + log.warn("secure-fork process not yet dead"); + } + } + } + } finally { + process = null; + channel = null; + out.stop(); + err.stop(); + } + } + + protected void make(String username, String fpn) { + log.info("about to request subprocess creation for " + username + + " producing ID " + fpn); + channel.println(username + " " + fpn); + } + + @Override + public RemoteRunFactory make(String username) throws Exception { + try { + main.getTheRegistry().list(); // Validate registry connection first + } catch (ConnectException | ConnectIOException e) { + log.warn("connection problems with registry", e); + } catch (RemoteException e) { + if (e.getCause() != null && e.getCause() instanceof Exception) { + throw (Exception) e.getCause(); + } + log.warn("connection problems with registry", e); + } + + String fpn = state.getFactoryProcessNamePrefix() + randomUUID(); + make(username, fpn); + + // Wait for the subprocess to register itself in the RMI registry + Calendar deadline = Calendar.getInstance(); + deadline.add(SECOND, state.getWaitSeconds()); + Exception lastException = null; + lastStartupCheckCount = 0; + while (deadline.after(Calendar.getInstance())) { + try { + sleep(state.getSleepMS()); + lastStartupCheckCount++; + log.info("about to look up resource called " + fpn); + RemoteRunFactory f = (RemoteRunFactory) main.getTheRegistry() + .lookup(fpn); + log.info("successfully connected to factory subprocess " + fpn); + main.initInteractionDetails(f); + main.registerFactory(username, fpn, f); + return f; + } catch (InterruptedException ie) { + continue; + } catch (NotBoundException nbe) { + lastException = nbe; + log.info("resource \"" + fpn + "\" not yet registered..."); + continue; + } catch (RemoteException re) { + // Unpack a remote exception if we can + lastException = re; + try { + if (re.getCause() != null) + lastException = (Exception) re.getCause(); + } catch (Throwable t) { + // Ignore! + } + } catch (Exception e) { + lastException = e; + } + } + if (lastException == null) + lastException = new InterruptedException(); + throw lastException; + } + + @Override + public Integer lastExitCode() { + return lastExitCode; + } + + @Override + public int lastStartupCheckCount() { + return lastStartupCheckCount; + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/LocalWorkerFactory.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/LocalWorkerFactory.java b/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/LocalWorkerFactory.java new file mode 100644 index 0000000..f890204 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/LocalWorkerFactory.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2011 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.master.localworker; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * Provider of the configuration of the "localworker.factory" bean, which is + * sufficiently complex to be too hard to manufacture directly from the XML + * configuration. + * + * @author Donal Fellows + */ +@Configuration +public class LocalWorkerFactory { + @Bean(name = "localworker.factory") + AbstractRemoteRunFactory getLocalworkerFactory( + @Value("${backEndFactory}") String mode) throws Exception { + if (mode == null || mode.isEmpty() || mode.startsWith("${")) + throw new Exception("no value for ${backEndFactory}"); + Class<?> c = Class.forName(mode); + if (AbstractRemoteRunFactory.class.isAssignableFrom(c)) + return (AbstractRemoteRunFactory) c.newInstance(); + throw new Exception("unknown remote run factory: " + mode); + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/LocalWorkerState.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/LocalWorkerState.java b/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/LocalWorkerState.java new file mode 100644 index 0000000..9f0f39b --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/LocalWorkerState.java @@ -0,0 +1,454 @@ +/* + * Copyright (C) 2010-2013 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.master.localworker; + +import static java.io.File.separator; +import static java.lang.System.getProperty; +import static java.rmi.registry.Registry.REGISTRY_PORT; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static org.taverna.server.master.defaults.Default.EXTRA_ARGUMENTS; +import static org.taverna.server.master.defaults.Default.PASSWORD_FILE; +import static org.taverna.server.master.defaults.Default.REGISTRY_JAR; +import static org.taverna.server.master.defaults.Default.RMI_PREFIX; +import static org.taverna.server.master.defaults.Default.RUN_LIFE_MINUTES; +import static org.taverna.server.master.defaults.Default.RUN_OPERATING_LIMIT; +import static org.taverna.server.master.defaults.Default.SECURE_FORK_IMPLEMENTATION_JAR; +import static org.taverna.server.master.defaults.Default.SERVER_WORKER_IMPLEMENTATION_JAR; +import static org.taverna.server.master.defaults.Default.SUBPROCESS_START_POLL_SLEEP; +import static org.taverna.server.master.defaults.Default.SUBPROCESS_START_WAIT; +import static org.taverna.server.master.localworker.PersistedState.KEY; +import static org.taverna.server.master.localworker.PersistedState.makeInstance; + +import java.io.File; +import java.io.FilenameFilter; +import java.net.URI; +import java.util.List; + +import javax.annotation.PostConstruct; +import javax.jdo.annotations.PersistenceAware; + +import org.springframework.beans.factory.annotation.Required; +import org.taverna.server.master.common.Status; +import org.taverna.server.master.defaults.Default; +import org.taverna.server.master.utils.JDOSupport; +import org.taverna.server.master.worker.WorkerModel; + +/** + * The persistent state of a local worker factory. + * + * @author Donal Fellows + */ +@PersistenceAware +public class LocalWorkerState extends JDOSupport<PersistedState> implements + WorkerModel { + public LocalWorkerState() { + super(PersistedState.class); + } + + private LocalWorkerState self; + + @Required + public void setSelf(LocalWorkerState self) { + this.self = self; + } + + /** Initial lifetime of runs, in minutes. */ + int defaultLifetime; + /** + * Maximum number of runs to exist at once. Note that this includes when + * they are just existing for the purposes of file transfer ( + * {@link Status#Initialized}/{@link Status#Finished} states). + */ + int maxRuns; + /** + * Prefix to use for RMI names. + */ + String factoryProcessNamePrefix; + /** + * Full path name of the script used to start running a workflow; normally + * expected to be "<i>somewhere/</i><tt>executeWorkflow.sh</tt>". + */ + String executeWorkflowScript; + /** Default value for {@link #executeWorkflowScript}. */ + private transient String defaultExecuteWorkflowScript; + /** + * Full path name of the file containing the password used to launch workers + * as other users. The file is normally expected to contain a single line, + * the password, and to be thoroughly locked down so only the user running + * the server (e.g., "<tt>tomcat</tt>") can read it; it will probably reside + * in either the user's home directory or in a system configuration + * directory. + */ + String passwordFile; + /** Default value for {@link #passwordFile}. */ + private transient String defaultPasswordFile = PASSWORD_FILE; + /** + * The extra arguments to pass to the subprocess. + */ + String[] extraArgs; + /** + * How long to wait for subprocess startup, in seconds. + */ + int waitSeconds; + /** + * Polling interval to use during startup, in milliseconds. + */ + int sleepMS; + /** + * Full path name to the worker process's implementation JAR. + */ + String serverWorkerJar; + private static final String DEFAULT_WORKER_JAR = LocalWorkerState.class + .getClassLoader().getResource(SERVER_WORKER_IMPLEMENTATION_JAR) + .getFile(); + /** + * Full path name to the Java binary to use to run the subprocess. + */ + String javaBinary; + private static final String DEFAULT_JAVA_BINARY = getProperty("java.home") + + separator + "bin" + separator + "java"; + /** + * Full path name to the secure fork process's implementation JAR. + */ + String serverForkerJar; + private static final String DEFAULT_FORKER_JAR = LocalWorkerState.class + .getClassLoader().getResource(SECURE_FORK_IMPLEMENTATION_JAR) + .getFile(); + + String registryHost; + int registryPort; + + int operatingLimit; + + URI[] permittedWorkflows; + private String registryJar; + private static final String DEFAULT_REGISTRY_JAR = LocalWorkerState.class + .getClassLoader().getResource(REGISTRY_JAR).getFile(); + + @Override + public void setDefaultLifetime(int defaultLifetime) { + this.defaultLifetime = defaultLifetime; + if (loadedState) + self.store(); + } + + @Override + public int getDefaultLifetime() { + return defaultLifetime < 1 ? RUN_LIFE_MINUTES : defaultLifetime; + } + + @Override + public void setMaxRuns(int maxRuns) { + this.maxRuns = maxRuns; + if (loadedState) + self.store(); + } + + @Override + public int getMaxRuns() { + return maxRuns < 1 ? Default.RUN_COUNT_MAX : maxRuns; + } + + @Override + public int getOperatingLimit() { + return operatingLimit < 1 ? RUN_OPERATING_LIMIT : operatingLimit; + } + + @Override + public void setOperatingLimit(int operatingLimit) { + this.operatingLimit = operatingLimit; + if (loadedState) + self.store(); + } + + @Override + public void setFactoryProcessNamePrefix(String factoryProcessNamePrefix) { + this.factoryProcessNamePrefix = factoryProcessNamePrefix; + if (loadedState) + self.store(); + } + + @Override + public String getFactoryProcessNamePrefix() { + return factoryProcessNamePrefix == null ? RMI_PREFIX + : factoryProcessNamePrefix; + } + + @Override + public void setExecuteWorkflowScript(String executeWorkflowScript) { + this.executeWorkflowScript = executeWorkflowScript; + if (loadedState) + self.store(); + } + + @Override + public String getExecuteWorkflowScript() { + return executeWorkflowScript == null ? defaultExecuteWorkflowScript + : executeWorkflowScript; + } + + private static String guessWorkflowScript() { + File utilDir = new File(DEFAULT_WORKER_JAR).getParentFile(); + File[] dirs = utilDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.startsWith("taverna-commandline-"); + } + }); + assert dirs.length > 0; + return new File(dirs[0], "executeworkflow.sh").toString(); + } + + /** + * Set what executeworkflow script to use by default. This is the value that + * is used if not overridden by the administration interface. + * + * @param defaultScript + * Full path to the script to use. + */ + public void setDefaultExecuteWorkflowScript(String defaultScript) { + if (defaultScript.startsWith("${") || defaultScript.equals("NONE")) { + this.defaultExecuteWorkflowScript = guessWorkflowScript(); + return; + } + this.defaultExecuteWorkflowScript = defaultScript; + } + + String getDefaultExecuteWorkflowScript() { + return defaultExecuteWorkflowScript; + } + + @Override + public void setExtraArgs(String[] extraArgs) { + this.extraArgs = extraArgs.clone(); + if (loadedState) + self.store(); + } + + @Override + public String[] getExtraArgs() { + return extraArgs == null ? EXTRA_ARGUMENTS : extraArgs.clone(); + } + + @Override + public void setWaitSeconds(int waitSeconds) { + this.waitSeconds = waitSeconds; + if (loadedState) + self.store(); + } + + @Override + public int getWaitSeconds() { + return waitSeconds < 1 ? SUBPROCESS_START_WAIT : waitSeconds; + } + + @Override + public void setSleepMS(int sleepMS) { + this.sleepMS = sleepMS; + if (loadedState) + self.store(); + } + + @Override + public int getSleepMS() { + return sleepMS < 1 ? SUBPROCESS_START_POLL_SLEEP : sleepMS; + } + + @Override + public void setServerWorkerJar(String serverWorkerJar) { + this.serverWorkerJar = serverWorkerJar; + if (loadedState) + self.store(); + } + + @Override + public String getServerWorkerJar() { + return serverWorkerJar == null ? DEFAULT_WORKER_JAR : serverWorkerJar; + } + + @Override + public void setServerForkerJar(String serverForkerJar) { + this.serverForkerJar = serverForkerJar; + if (loadedState) + self.store(); + } + + @Override + public String getServerForkerJar() { + return serverForkerJar == null ? DEFAULT_FORKER_JAR : serverForkerJar; + } + + @Override + public void setJavaBinary(String javaBinary) { + this.javaBinary = javaBinary; + if (loadedState) + self.store(); + } + + @Override + public String getJavaBinary() { + return javaBinary == null ? DEFAULT_JAVA_BINARY : javaBinary; + } + + @Override + public void setPasswordFile(String passwordFile) { + this.passwordFile = passwordFile; + if (loadedState) + self.store(); + } + + @Override + public String getPasswordFile() { + return passwordFile == null ? defaultPasswordFile : passwordFile; + } + + void setDefaultPasswordFile(String defaultPasswordFile) { + this.defaultPasswordFile = defaultPasswordFile; + } + + @Override + public void setRegistryHost(String registryHost) { + this.registryHost = (registryHost == null ? "" : registryHost); + if (loadedState) + self.store(); + } + + @Override + public String getRegistryHost() { + return (registryHost == null || registryHost.isEmpty()) ? null + : registryHost; + } + + @Override + public void setRegistryPort(int registryPort) { + this.registryPort = ((registryPort < 1 || registryPort > 65534) ? REGISTRY_PORT + : registryPort); + if (loadedState) + self.store(); + } + + @Override + public int getRegistryPort() { + return registryPort == 0 ? REGISTRY_PORT : registryPort; + } + + @Override + public String getRegistryJar() { + return registryJar == null ? DEFAULT_REGISTRY_JAR : registryJar; + } + + @Override + public void setRegistryJar(String rmiRegistryJar) { + this.registryJar = (rmiRegistryJar == null || rmiRegistryJar.isEmpty()) ? null + : rmiRegistryJar; + if (loadedState) + self.store(); + } + + @Override + public List<URI> getPermittedWorkflowURIs() { + if (permittedWorkflows == null || permittedWorkflows.length == 0) + return emptyList(); + return unmodifiableList(asList(permittedWorkflows)); + } + + @Override + public void setPermittedWorkflowURIs(List<URI> permittedWorkflows) { + if (permittedWorkflows == null || permittedWorkflows.isEmpty()) + this.permittedWorkflows = new URI[0]; + else + this.permittedWorkflows = permittedWorkflows + .toArray(new URI[permittedWorkflows.size()]); + if (loadedState) + self.store(); + } + + public static final boolean DEFAULT_GENERATE_PROVENANCE = false; + private Boolean generateProvenance; + + @Override + public boolean getGenerateProvenance() { + Boolean g = generateProvenance; + return g == null ? DEFAULT_GENERATE_PROVENANCE : (boolean) g; + } + + @Override + public void setGenerateProvenance(boolean generate) { + this.generateProvenance = generate; + if (loadedState) + self.store(); + } + + // -------------------------------------------------------------- + + private boolean loadedState; + + @PostConstruct + @WithinSingleTransaction + public void load() { + if (loadedState || !isPersistent()) + return; + WorkerModel state = getById(KEY); + if (state == null) { + store(); + return; + } + + defaultLifetime = state.getDefaultLifetime(); + executeWorkflowScript = state.getExecuteWorkflowScript(); + extraArgs = state.getExtraArgs(); + factoryProcessNamePrefix = state.getFactoryProcessNamePrefix(); + javaBinary = state.getJavaBinary(); + maxRuns = state.getMaxRuns(); + serverWorkerJar = state.getServerWorkerJar(); + serverForkerJar = state.getServerForkerJar(); + passwordFile = state.getPasswordFile(); + sleepMS = state.getSleepMS(); + waitSeconds = state.getWaitSeconds(); + registryHost = state.getRegistryHost(); + registryPort = state.getRegistryPort(); + operatingLimit = state.getOperatingLimit(); + List<URI> pwu = state.getPermittedWorkflowURIs(); + permittedWorkflows = (URI[]) pwu.toArray(new URI[pwu.size()]); + registryJar = state.getRegistryJar(); + generateProvenance = state.getGenerateProvenance(); + + loadedState = true; + } + + @WithinSingleTransaction + public void store() { + if (!isPersistent()) + return; + WorkerModel state = getById(KEY); + if (state == null) + state = persist(makeInstance()); + + state.setDefaultLifetime(defaultLifetime); + state.setExecuteWorkflowScript(executeWorkflowScript); + state.setExtraArgs(extraArgs); + state.setFactoryProcessNamePrefix(factoryProcessNamePrefix); + state.setJavaBinary(javaBinary); + state.setMaxRuns(maxRuns); + state.setServerWorkerJar(serverWorkerJar); + state.setServerForkerJar(serverForkerJar); + state.setPasswordFile(passwordFile); + state.setSleepMS(sleepMS); + state.setWaitSeconds(waitSeconds); + state.setRegistryHost(registryHost); + state.setRegistryPort(registryPort); + state.setOperatingLimit(operatingLimit); + if (permittedWorkflows != null) + state.setPermittedWorkflowURIs(asList(permittedWorkflows)); + state.setRegistryJar(registryJar); + if (generateProvenance != null) + state.setGenerateProvenance(generateProvenance); + + loadedState = true; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/PersistedState.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/PersistedState.java b/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/PersistedState.java new file mode 100644 index 0000000..83d6bda --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/PersistedState.java @@ -0,0 +1,257 @@ +/* + * Copyright (C) 2010-2013 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.master.localworker; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import javax.jdo.annotations.Join; +import javax.jdo.annotations.PersistenceCapable; +import javax.jdo.annotations.Persistent; +import javax.jdo.annotations.PrimaryKey; + +import org.taverna.server.master.worker.WorkerModel; + +/** + * The actual database connector for persisted local worker state. + * + * @author Donal Fellows + */ +/* + * WARNING! If you change the name of this class, update persistence.xml as + * well! + */ +@PersistenceCapable(table = PersistedState.TABLE) +class PersistedState implements WorkerModel { + static final String TABLE = "LOCALWORKERSTATE__PERSISTEDSTATE"; + + static PersistedState makeInstance() { + PersistedState o = new PersistedState(); + o.ID = KEY; + return o; + } + + @PrimaryKey(column = "ID") + protected int ID; + + static final int KEY = 32; + + @Persistent + private int defaultLifetime; + @Persistent + private int maxRuns; + @Persistent + private String factoryProcessNamePrefix; + @Persistent + private String executeWorkflowScript; + @Persistent(serialized = "true") + private String[] extraArgs; + @Persistent + private int waitSeconds; + @Persistent + private int sleepMS; + @Persistent + private String serverWorkerJar; + @Persistent + private String serverForkerJar; + @Persistent + private String registryJar; + @Persistent + private String passwordFile; + @Persistent + private String javaBinary; + @Persistent + private int registryPort; + @Persistent + private String registryHost; + @Persistent + private int operatingLimit; + @Persistent(defaultFetchGroup = "true") + @Join(table = TABLE + "_PERMWFURI", column = "ID") + private String[] permittedWorkflows; + @Persistent + private int generateProvenance; + + @Override + public void setDefaultLifetime(int defaultLifetime) { + this.defaultLifetime = defaultLifetime; + } + + @Override + public int getDefaultLifetime() { + return defaultLifetime; + } + + @Override + public void setMaxRuns(int maxRuns) { + this.maxRuns = maxRuns; + } + + @Override + public int getMaxRuns() { + return maxRuns; + } + + @Override + public void setFactoryProcessNamePrefix(String factoryProcessNamePrefix) { + this.factoryProcessNamePrefix = factoryProcessNamePrefix; + } + + @Override + public String getFactoryProcessNamePrefix() { + return factoryProcessNamePrefix; + } + + @Override + public void setExecuteWorkflowScript(String executeWorkflowScript) { + this.executeWorkflowScript = executeWorkflowScript; + } + + @Override + public String getExecuteWorkflowScript() { + return executeWorkflowScript; + } + + @Override + public void setExtraArgs(String[] extraArgs) { + this.extraArgs = extraArgs; + } + + @Override + public String[] getExtraArgs() { + return extraArgs; + } + + @Override + public void setWaitSeconds(int waitSeconds) { + this.waitSeconds = waitSeconds; + } + + @Override + public int getWaitSeconds() { + return waitSeconds; + } + + @Override + public void setSleepMS(int sleepMS) { + this.sleepMS = sleepMS; + } + + @Override + public int getSleepMS() { + return sleepMS; + } + + @Override + public void setServerWorkerJar(String serverWorkerJar) { + this.serverWorkerJar = serverWorkerJar; + } + + @Override + public String getServerWorkerJar() { + return serverWorkerJar; + } + + @Override + public void setJavaBinary(String javaBinary) { + this.javaBinary = javaBinary; + } + + @Override + public String getJavaBinary() { + return javaBinary; + } + + @Override + public void setRegistryPort(int registryPort) { + this.registryPort = registryPort; + } + + @Override + public int getRegistryPort() { + return registryPort; + } + + @Override + public void setRegistryHost(String registryHost) { + this.registryHost = registryHost; + } + + @Override + public String getRegistryHost() { + return registryHost; + } + + @Override + public void setServerForkerJar(String serverForkerJar) { + this.serverForkerJar = serverForkerJar; + } + + @Override + public String getServerForkerJar() { + return serverForkerJar; + } + + @Override + public void setPasswordFile(String passwordFile) { + this.passwordFile = passwordFile; + } + + @Override + public String getPasswordFile() { + return passwordFile; + } + + @Override + public void setOperatingLimit(int operatingLimit) { + this.operatingLimit = operatingLimit; + } + + @Override + public int getOperatingLimit() { + return operatingLimit; + } + + @Override + public List<URI> getPermittedWorkflowURIs() { + String[] pw = this.permittedWorkflows; + if (pw == null) + return new ArrayList<>(); + List<URI> uris = new ArrayList<>(pw.length); + for (String uri : pw) + uris.add(URI.create(uri)); + return uris; + } + + @Override + public void setPermittedWorkflowURIs(List<URI> permittedWorkflows) { + String[] pw = new String[permittedWorkflows.size()]; + for (int i = 0; i < pw.length; i++) + pw[i] = permittedWorkflows.get(i).toString(); + this.permittedWorkflows = pw; + } + + @Override + public String getRegistryJar() { + return registryJar; + } + + @Override + public void setRegistryJar(String registryJar) { + this.registryJar = registryJar; + } + + @Override + public boolean getGenerateProvenance() { + return generateProvenance > 0; + } + + @Override + public void setGenerateProvenance(boolean generateProvenance) { + this.generateProvenance = (generateProvenance ? 1 : 0); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/StreamLogger.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/StreamLogger.java b/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/StreamLogger.java new file mode 100644 index 0000000..f361e17 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/StreamLogger.java @@ -0,0 +1,62 @@ +package org.taverna.server.master.localworker; + +import static java.lang.Thread.interrupted; +import static org.apache.commons.logging.LogFactory.getLog; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import org.apache.commons.logging.Log; + +abstract class StreamLogger { + protected final Log log; + private Thread t; + private InputStream in; + + protected StreamLogger(final String name, InputStream is) { + log = getLog("Taverna.Server.LocalWorker." + name); + in = is; + t = new Thread(new Runnable() { + @Override + public void run() { + try (BufferedReader br = new BufferedReader( + new InputStreamReader(in))) { + String line; + while (!interrupted() && (line = br.readLine()) != null) + if (!line.isEmpty()) + write(line); + } catch (IOException e) { + // Do nothing... + } catch (Exception e) { + log.warn("failure in reading from " + name, e); + } + } + }, name + ".StreamLogger"); + t.setContextClassLoader(null); + t.setDaemon(true); + t.start(); + } + + /** + * Write a line read from the subprocess to the log. + * <p> + * This needs to be implemented by subclasses in order for the log to be + * correctly written with the class name. + * + * @param msg + * The message to write. Guaranteed to have no newline characters + * in it and to be non-empty. + */ + protected abstract void write(String msg); + + public void stop() { + log.info("trying to close down " + t.getName()); + t.interrupt(); + try { + in.close(); + } catch (IOException e) { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/package-info.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/package-info.java b/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/package-info.java new file mode 100644 index 0000000..7139dd7 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/taverna/server/master/localworker/package-info.java @@ -0,0 +1,10 @@ +/* + * Copyright (C) 2011 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +/** + * Implementation of a Taverna Server back-end that works by forking off + * workflow executors on the local system. + */ +package org.taverna.server.master.localworker; http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-webapp/src/main/java/org/taverna/server/master/notification/EmailDispatcher.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/taverna/server/master/notification/EmailDispatcher.java b/taverna-server-webapp/src/main/java/org/taverna/server/master/notification/EmailDispatcher.java new file mode 100644 index 0000000..3e27806 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/taverna/server/master/notification/EmailDispatcher.java @@ -0,0 +1,113 @@ +/* + * Copyright (C) 2010-2011 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.master.notification; + +import static javax.ws.rs.core.MediaType.TEXT_PLAIN; + +import javax.annotation.PostConstruct; + +import org.springframework.beans.factory.annotation.Required; +import org.springframework.mail.MailSender; +import org.springframework.mail.SimpleMailMessage; +import org.springframework.mail.javamail.JavaMailSender; + +/** + * How to send a plain text message by email to someone. + * + * @author Donal Fellows + */ +public class EmailDispatcher extends RateLimitedDispatcher { + @Override + public String getName() { + return "mailto"; + } + + /** + * @param from + * Email address that the notification is to come from. + */ + @Required + public void setFrom(String from) { + this.from = valid(from, ""); + } + + /** + * @param host + * The outgoing SMTP server address. + */ + @Required + public void setSmtpHost(String host) { + this.host = valid(host, ""); + } + + /** + * @param contentType + * The content type of the message to be sent. For example, " + * <tt>text/plain</tt>". + */ + public void setMessageContentType(String contentType) { + this.contentType = contentType; + } + + /** + * @param sender + * the sender to set + */ + public void setSender(MailSender sender) { + this.sender = sender; + } + + private String from; + private String host; + private MailSender sender; + @SuppressWarnings("unused") + private String contentType = TEXT_PLAIN; + + /** + * Try to perform the lookup of the email service. This is called during + * configuration so that any failure happens at a useful, predictable time. + */ + @PostConstruct + public void tryLookup() { + if (!isAvailable()) { + log.warn("no mail support; disabling email dispatch"); + sender = null; + return; + } + try { + if (sender instanceof JavaMailSender) + ((JavaMailSender) sender).createMimeMessage(); + } catch (Throwable t) { + log.warn("sender having problems constructing messages; " + + "disabling...", t); + sender = null; + } + } + + @Override + public void dispatch(String messageSubject, String messageContent, String to) + throws Exception { + // Simple checks for acceptability + if (!to.matches(".+@.+")) { + log.info("did not send email notification: improper email address \"" + + to + "\""); + return; + } + + SimpleMailMessage message = new SimpleMailMessage(); + message.setFrom(from); + message.setTo(to.trim()); + message.setSubject(messageSubject); + message.setText(messageContent); + sender.send(message); + } + + @Override + public boolean isAvailable() { + return (host != null && !host.isEmpty() && sender != null + && from != null && !from.isEmpty()); + } +}
