http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/server-webapp/src/main/java/org/taverna/server/master/interfaces/TavernaSecurityContext.java ---------------------------------------------------------------------- diff --git a/server-webapp/src/main/java/org/taverna/server/master/interfaces/TavernaSecurityContext.java b/server-webapp/src/main/java/org/taverna/server/master/interfaces/TavernaSecurityContext.java deleted file mode 100644 index b227bfa..0000000 --- a/server-webapp/src/main/java/org/taverna/server/master/interfaces/TavernaSecurityContext.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Copyright (C) 2010-2011 The University of Manchester - * - * See the file "LICENSE" for license terms. - */ -package org.taverna.server.master.interfaces; - -import java.io.IOException; -import java.security.GeneralSecurityException; -import java.security.Principal; -import java.util.Set; - -import javax.ws.rs.core.HttpHeaders; -import javax.xml.ws.handler.MessageContext; - -import org.springframework.security.core.context.SecurityContext; -import org.taverna.server.localworker.remote.ImplementationException; -import org.taverna.server.master.common.Credential; -import org.taverna.server.master.common.Trust; -import org.taverna.server.master.exceptions.InvalidCredentialException; -import org.taverna.server.master.utils.UsernamePrincipal; - -/** - * Security context for a workflow run. - * - * @author Donal Fellows - */ -public interface TavernaSecurityContext { - /** - * @return Who owns the security context. - */ - UsernamePrincipal getOwner(); - - /** - * Describe the names of the users (as extracted from their - * {@link Principal} objects) that may destroy the run or manipulate its - * lifetime. - * - * @return The names of the users who may use destroy operations. Read-only. - */ - Set<String> getPermittedDestroyers(); - - /** - * Sets the collection of names of users (as extracted from their - * {@link Principal} objects) that may destroy the run or manipulate its - * lifetime. - * - * @param destroyers - * The names of the users who may use destroy operations. - */ - void setPermittedDestroyers(Set<String> destroyers); - - /** - * Describe the names of the users (as extracted from their - * {@link Principal} objects) that may update the run (including writing to - * files). - * - * @return The names of the users who may use update operations. Read-only. - */ - Set<String> getPermittedUpdaters(); - - /** - * Sets the collection of names of users (as extracted from their - * {@link Principal} objects) that may update the run (including writing to - * its files). - * - * @param updaters - * The names of the users who may use update operations. - */ - void setPermittedUpdaters(Set<String> updaters); - - /** - * Describe the names of the users (as extracted from their - * {@link Principal} objects) that may read from the run (including its - * files). - * - * @return The names of the users who may use read operations. Read-only. - */ - Set<String> getPermittedReaders(); - - /** - * Sets the collection of names of users (as extracted from their - * {@link Principal} objects) that may read from the run (including its - * files). - * - * @param readers - * The names of the users who may use read operations. - */ - void setPermittedReaders(Set<String> readers); - - /** - * @return The credentials owned by the user. Never <tt>null</tt>. - */ - Credential[] getCredentials(); - - /** - * Add a credential to the owned set or replaces the old version with the - * new one. - * - * @param toAdd - * The credential to add. - */ - void addCredential(Credential toAdd); - - /** - * Remove a credential from the owned set. It's not a failure to remove - * something that isn't in the set. - * - * @param toDelete - * The credential to remove. - */ - void deleteCredential(Credential toDelete); - - /** - * Tests if the credential is valid. This includes testing whether the - * underlying credential file exists and can be unlocked by the password in - * the {@link Credential} object. - * - * @param c - * The credential object to validate. - * @throws InvalidCredentialException - * If it is invalid. - */ - void validateCredential(Credential c) throws InvalidCredentialException; - - /** - * @return The identities trusted by the user. Never <tt>null</tt>. - */ - Trust[] getTrusted(); - - /** - * Add an identity to the trusted set. - * - * @param toAdd - * The identity to add. - */ - void addTrusted(Trust toAdd); - - /** - * Remove an identity from the trusted set. It's not a failure to remove - * something that isn't in the set. - * - * @param toDelete - * The identity to remove. - */ - void deleteTrusted(Trust toDelete); - - /** - * Tests if the trusted identity descriptor is valid. This includes checking - * whether the underlying trusted identity file exists. - * - * @param t - * The trusted identity descriptor to check. - * @throws InvalidCredentialException - * If it is invalid. - */ - void validateTrusted(Trust t) throws InvalidCredentialException; - - /** - * Establish the security context from how the owning workflow run was - * created. In particular, this gives an opportunity for boot-strapping - * things with any delegateable credentials. - * - * @param securityContext - * The security context associated with the request that caused - * the workflow to be created. - * @throws Exception - * If anything goes wrong. - */ - void initializeSecurityFromContext(SecurityContext securityContext) - throws Exception; - - /** - * Establish the security context from how the owning workflow run was - * created. In particular, this gives an opportunity for boot-strapping - * things with any delegateable credentials. - * - * @param context - * The full information about the request that caused the - * workflow to be created. - */ - void initializeSecurityFromSOAPContext(MessageContext context); - - /** - * Establish the security context from how the owning workflow run was - * created. In particular, this gives an opportunity for boot-strapping - * things with any delegateable credentials. - * - * @param headers - * The full information about the request that caused the - * workflow to be created. - */ - void initializeSecurityFromRESTContext(HttpHeaders headers); - - /** - * Transfer the security context to the remote system. - * - * @throws IOException - * If the communication fails. - * @throws GeneralSecurityException - * If the assembly of the context fails. - * @throws ImplementationException - * If the local worker has problems with creating the realized - * security context. - */ - void conveySecurity() throws GeneralSecurityException, IOException, - ImplementationException; - - /** - * @return The factory that created this security context. - */ - SecurityContextFactory getFactory(); -}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/server-webapp/src/main/java/org/taverna/server/master/interfaces/UriBuilderFactory.java ---------------------------------------------------------------------- diff --git a/server-webapp/src/main/java/org/taverna/server/master/interfaces/UriBuilderFactory.java b/server-webapp/src/main/java/org/taverna/server/master/interfaces/UriBuilderFactory.java deleted file mode 100644 index e5cd02c..0000000 --- a/server-webapp/src/main/java/org/taverna/server/master/interfaces/UriBuilderFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (C) 2010-2013 The University of Manchester - * - * See the file "LICENSE" for license terms. - */ -package org.taverna.server.master.interfaces; - -import java.net.URI; - -import javax.ws.rs.core.UriBuilder; - -/** - * How to manufacture URIs to workflow runs. - * - * @author Donal Fellows - */ -public interface UriBuilderFactory { - /** - * Given a run, get a factory for RESTful URIs to resources associated - * with it. - * - * @param run - * The run in question. - * @return The {@link URI} factory. - */ - UriBuilder getRunUriBuilder(TavernaRun run); - - /** - * @return a URI factory that is preconfigured to point to the base of - * the webapp. - */ - UriBuilder getBaseUriBuilder(); - - /** - * Resolves a URI with respect to the base URI of the factory. - * - * @param uri - * The URI to resolve, or <tt>null</tt>. - * @return The resolved URI, or <tt>null</tt> if <b>uri</b> is - * <tt>null</tt>. - */ - String resolve(String uri); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/server-webapp/src/main/java/org/taverna/server/master/interfaces/package-info.java ---------------------------------------------------------------------- diff --git a/server-webapp/src/main/java/org/taverna/server/master/interfaces/package-info.java b/server-webapp/src/main/java/org/taverna/server/master/interfaces/package-info.java deleted file mode 100644 index cfbbe79..0000000 --- a/server-webapp/src/main/java/org/taverna/server/master/interfaces/package-info.java +++ /dev/null @@ -1,10 +0,0 @@ -/* - * Copyright (C) 2010-2011 The University of Manchester - * - * See the file "LICENSE" for license terms. - */ -/** - * Interfaces to the main worker classes that provide the magical power - * that drives the webapp front-end. - */ -package org.taverna.server.master.interfaces; http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/server-webapp/src/main/java/org/taverna/server/master/localworker/AbstractRemoteRunFactory.java ---------------------------------------------------------------------- diff --git a/server-webapp/src/main/java/org/taverna/server/master/localworker/AbstractRemoteRunFactory.java b/server-webapp/src/main/java/org/taverna/server/master/localworker/AbstractRemoteRunFactory.java deleted file mode 100644 index fc7f881..0000000 --- a/server-webapp/src/main/java/org/taverna/server/master/localworker/AbstractRemoteRunFactory.java +++ /dev/null @@ -1,440 +0,0 @@ -/* - * Copyright (C) 2010-2013 The University of Manchester - * - * See the file "LICENSE" for license terms. - */ -package org.taverna.server.master.localworker; - -import static java.lang.System.getSecurityManager; -import static java.lang.System.setProperty; -import static java.lang.System.setSecurityManager; -import static java.rmi.registry.LocateRegistry.createRegistry; -import static java.rmi.registry.LocateRegistry.getRegistry; -import static java.rmi.registry.Registry.REGISTRY_PORT; -import static java.util.UUID.randomUUID; -import static org.taverna.server.master.TavernaServer.JMX_ROOT; -import static org.taverna.server.master.rest.TavernaServerRunREST.PathNames.DIR; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.net.URI; -import java.net.URL; -import java.rmi.MarshalledObject; -import java.rmi.RMISecurityManager; -import java.rmi.RemoteException; -import java.rmi.registry.LocateRegistry; -import java.rmi.registry.Registry; -import java.rmi.server.UnicastRemoteObject; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.UUID; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import javax.annotation.Resource; -import javax.xml.bind.JAXBException; - -import org.apache.commons.io.IOUtils; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.jmx.export.annotation.ManagedResource; -import org.taverna.server.localworker.remote.RemoteRunFactory; -import org.taverna.server.localworker.remote.RemoteSingleRun; -import org.taverna.server.localworker.server.UsageRecordReceiver; -import org.taverna.server.master.common.Workflow; -import org.taverna.server.master.exceptions.NoCreateException; -import org.taverna.server.master.exceptions.NoListenerException; -import org.taverna.server.master.factories.ListenerFactory; -import org.taverna.server.master.factories.RunFactory; -import org.taverna.server.master.interaction.InteractionFeedSupport; -import org.taverna.server.master.interfaces.Listener; -import org.taverna.server.master.interfaces.SecurityContextFactory; -import org.taverna.server.master.interfaces.TavernaRun; -import org.taverna.server.master.interfaces.UriBuilderFactory; -import org.taverna.server.master.notification.atom.EventDAO; -import org.taverna.server.master.usage.UsageRecordRecorder; -import org.taverna.server.master.utils.UsernamePrincipal; -import org.taverna.server.master.worker.FactoryBean; -import org.taverna.server.master.worker.RemoteRunDelegate; -import org.taverna.server.master.worker.RunFactoryConfiguration; - -import uk.org.taverna.scufl2.api.io.WriterException; - -/** - * Bridge to remote runs via RMI. - * - * @author Donal Fellows - */ -@ManagedResource(objectName = JMX_ROOT + "Factory", description = "The factory for runs") -public abstract class AbstractRemoteRunFactory extends RunFactoryConfiguration - implements ListenerFactory, RunFactory, FactoryBean { - /** - * Whether to apply stronger limitations than normal to RMI. It is - * recommended that this be true! - */ - @Value("${rmi.localhostOnly}") - private boolean rmiLocalhostOnly; - /** The interaction host name. */ - private String interhost; - /** The interaction port number. */ - private String interport; - private Process registryProcess; - /** - * The interaction WebDAV location. Will be resolved before being passed to - * the back-end. - */ - private String interwebdav; - /** - * The interaction ATOM feed location. Will be resolved before being passed - * to the back-end. - */ - private String interfeed; - /** Used for doing URI resolution. */ - @Resource(name = "webapp") - private UriBuilderFactory baseurifactory; - @Autowired - private InteractionFeedSupport interactionFeedSupport; - - @Value("${taverna.interaction.host}") - void setInteractionHost(String host) { - if (host != null && host.equals("none")) - host = null; - interhost = host; - } - - @Value("${taverna.interaction.port}") - void setInteractionPort(String port) { - if (port != null && port.equals("none")) - port = null; - interport = port; - } - - @Value("${taverna.interaction.webdav_path}") - void setInteractionWebdav(String webdav) { - if (webdav != null && webdav.equals("none")) - webdav = null; - interwebdav = webdav; - } - - @Value("${taverna.interaction.feed_path}") - void setInteractionFeed(String feed) { - if (feed != null && feed.equals("none")) - feed = null; - interfeed = feed; - } - - @Override - protected void reinitRegistry() { - registry = null; - if (registryProcess != null) { - registryProcess.destroy(); - registryProcess = null; - } - } - - protected void initInteractionDetails(RemoteRunFactory factory) - throws RemoteException { - if (interhost != null) { - String feed = baseurifactory.resolve(interfeed); - String webdav = baseurifactory.resolve(interwebdav); - factory.setInteractionServiceDetails(interhost, interport, webdav, - feed); - } - } - - protected static final Process launchSubprocess(ProcessBuilder b) - throws IOException { - Thread t = Thread.currentThread(); - ClassLoader ccl = t.getContextClassLoader(); - try { - t.setContextClassLoader(null); - return b.start(); - } finally { - t.setContextClassLoader(ccl); - } - } - - /** Get a handle to a new instance of the RMI registry. */ - private Registry makeRegistry(int port) throws RemoteException { - ProcessBuilder p = new ProcessBuilder(getJavaBinary()); - p.command().add("-jar"); - p.command().add(getRmiRegistryJar()); - p.command().add(Integer.toString(port)); - p.command().add(Boolean.toString(rmiLocalhostOnly)); - try { - Process proc = launchSubprocess(p); - Thread.sleep(getSleepTime()); - try { - if (proc.exitValue() == 0) - return null; - String error = IOUtils.toString(proc.getErrorStream()); - throw new RemoteException(error); - } catch (IllegalThreadStateException ise) { - // Still running! - } - try (ObjectInputStream ois = new ObjectInputStream( - proc.getInputStream())) { - @SuppressWarnings("unchecked") - Registry r = ((MarshalledObject<Registry>) ois.readObject()) - .get(); - registryProcess = proc; - return r; - } - } catch (RemoteException e) { - throw e; - } catch (ClassNotFoundException e) { - throw new RemoteException("unexpected registry type", e); - } catch (IOException e) { - throw new RemoteException("unexpected IO problem with registry", e); - } catch (InterruptedException e) { - throw new RemoteException("unexpected interrupt"); - } - } - - /** - * @return A handle to the current RMI registry. - */ - protected Registry getTheRegistry() { - try { - if (registry != null) { - registry.list(); - return registry; - } - } catch (RemoteException e) { - log.warn("non-functioning existing registry handle", e); - registry = null; - } - try { - registry = getRegistry(getRegistryHost(), getRegistryPort()); - registry.list(); - return registry; - } catch (RemoteException e) { - log.warn("Failed to get working RMI registry handle."); - registry = null; - log.warn("Will build new registry, " - + "but service restart ability is at risk."); - try { - registry = makeRegistry(getRegistryPort()); - registry.list(); - return registry; - } catch (RemoteException e2) { - log.error( - "failed to create local working RMI registry on port " - + getRegistryPort(), e2); - log.info("original connection exception", e); - } - } - try { - registry = getRegistry(getRegistryHost(), REGISTRY_PORT); - registry.list(); - return registry; - } catch (RemoteException e) { - log.warn("Failed to get working RMI registry handle on backup port."); - try { - registry = makeRegistry(REGISTRY_PORT); - registry.list(); - return registry; - } catch (RemoteException e2) { - log.fatal( - "totally failed to get registry handle, even on fallback!", - e2); - log.info("original connection exception", e); - registry = null; - throw new RuntimeException("No RMI Registry Available"); - } - } - } - - private Registry registry; - /** - * The name of the resource that describes the default security policy to - * install. - */ - public static final String SECURITY_POLICY_FILE = "security.policy"; - private SecurityContextFactory securityFactory; - UsageRecordRecorder usageRecordSink; - private EventDAO masterEventFeed; - - @Autowired(required = true) - void setSecurityContextFactory(SecurityContextFactory factory) { - this.securityFactory = factory; - } - - @Autowired(required = true) - void setMasterEventFeed(EventDAO masterEventFeed) { - this.masterEventFeed = masterEventFeed; - } - - @Autowired(required = true) - void setUsageRecordSink(UsageRecordRecorder usageRecordSink) { - this.usageRecordSink = usageRecordSink; - } - - /** - * Configures the Java security model. Not currently used, as it is - * viciously difficult to get right! - */ - @SuppressWarnings("unused") - private static void installSecurityManager() { - if (getSecurityManager() == null) { - setProperty("java.security.policy", AbstractRemoteRunFactory.class - .getClassLoader().getResource(SECURITY_POLICY_FILE) - .toExternalForm()); - setSecurityManager(new RMISecurityManager()); - } - } - - // static { - // installSecurityManager(); - // } - - /** - * Set up the run expiry management engine. - * - * @throws JAXBException - */ - public AbstractRemoteRunFactory() throws JAXBException { - try { - registry = LocateRegistry.getRegistry(); - registry.list(); - } catch (RemoteException e) { - log.warn("Failed to get working RMI registry handle."); - log.warn("Will build new registry, but service restart ability is at risk."); - try { - registry = createRegistry(REGISTRY_PORT); - registry.list(); - } catch (RemoteException e2) { - log.error("failed to create working RMI registry", e2); - log.info("original connection exception", e); - } - } - } - - @Override - public List<String> getSupportedListenerTypes() { - try { - RemoteRunDelegate rrd = runDB.pickArbitraryRun(); - if (rrd != null) - return rrd.getListenerTypes(); - log.warn("no remote runs; no listener types"); - } catch (Exception e) { - log.warn("failed to get list of listener types", e); - } - return new ArrayList<>(); - } - - @Override - public Listener makeListener(TavernaRun run, String listenerType, - String configuration) throws NoListenerException { - if (run instanceof RemoteRunDelegate) - return ((RemoteRunDelegate) run).makeListener(listenerType, - configuration); - throw new NoListenerException("unexpected run type: " + run.getClass()); - } - - @Override - public TavernaRun create(UsernamePrincipal creator, Workflow workflow) - throws NoCreateException { - try { - Date now = new Date(); - UUID id = randomUUID(); - RemoteSingleRun rsr = getRealRun(creator, workflow, id); - RemoteRunDelegate run = new RemoteRunDelegate(now, workflow, rsr, - state.getDefaultLifetime(), runDB, id, - state.getGenerateProvenance(), this); - run.setSecurityContext(securityFactory.create(run, creator)); - @Nonnull - URI feed = interactionFeedSupport.getFeedURI(run); - @Nonnull - URL feedUrl = feed.toURL(); - @Nonnull - URL webdavUrl = baseurifactory.getRunUriBuilder(run) - .path(DIR + "/interactions").build().toURL(); - @Nullable - URL pub = interactionFeedSupport.getLocalFeedBase(feed); - rsr.setInteractionServiceDetails(feedUrl, webdavUrl, pub); - return run; - } catch (NoCreateException e) { - log.warn("failed to build run instance", e); - throw e; - } catch (Exception e) { - log.warn("failed to build run instance", e); - throw new NoCreateException("failed to build run instance", e); - } - } - - /** - * Gets the RMI connector for a new run. - * - * @param creator - * Who is creating the workflow run. - * @param workflow - * What workflow are they instantiating. - * @param id - * The identity token for the run, newly minted. - * @return The remote interface to the run. - * @throws Exception - * Just about anything can go wrong... - */ - protected abstract RemoteSingleRun getRealRun(UsernamePrincipal creator, - Workflow workflow, UUID id) throws Exception; - - /** - * How to convert a wrapped workflow into XML. - * - * @param workflow - * The wrapped workflow. - * @return The XML version of the document. - * @throws JAXBException - * If serialization fails. - */ - protected byte[] serializeWorkflow(Workflow workflow) throws JAXBException { - try { - return workflow.getScufl2Bytes(); - } catch (IOException e) { - throw new JAXBException("problem converting to scufl2", e); - } catch (WriterException e) { - throw new JAXBException("problem converting to scufl2", e); - } - } - - private void acceptUsageRecord(String usageRecord) { - if (usageRecordSink != null) - usageRecordSink.storeUsageRecord(usageRecord); - runDB.checkForFinishNow(); - } - - /** - * Make a Remote object that can act as a consumer for usage records. - * - * @param creator - * - * @return The receiver, or <tt>null</tt> if the construction fails. - */ - protected UsageRecordReceiver makeURReciver(UsernamePrincipal creator) { - try { - @SuppressWarnings("serial") - class URReceiver extends UnicastRemoteObject implements - UsageRecordReceiver { - public URReceiver() throws RemoteException { - super(); - } - - @Override - public void acceptUsageRecord(String usageRecord) { - AbstractRemoteRunFactory.this.acceptUsageRecord(usageRecord); - } - } - return new URReceiver(); - } catch (RemoteException e) { - log.warn("failed to build usage record receiver", e); - return null; - } - } - - @Override - public EventDAO getMasterEventFeed() { - return masterEventFeed; - } -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/server-webapp/src/main/java/org/taverna/server/master/localworker/ForkRunFactory.java ---------------------------------------------------------------------- diff --git a/server-webapp/src/main/java/org/taverna/server/master/localworker/ForkRunFactory.java b/server-webapp/src/main/java/org/taverna/server/master/localworker/ForkRunFactory.java deleted file mode 100644 index b67e121..0000000 --- a/server-webapp/src/main/java/org/taverna/server/master/localworker/ForkRunFactory.java +++ /dev/null @@ -1,323 +0,0 @@ -/* - * Copyright (C) 2010-2011 The University of Manchester - * - * See the file "LICENSE" for license terms. - */ -package org.taverna.server.master.localworker; - -import static java.lang.System.getProperty; -import static java.lang.Thread.sleep; -import static java.util.Arrays.asList; -import static java.util.Calendar.SECOND; -import static java.util.UUID.randomUUID; -import static org.taverna.server.master.TavernaServer.JMX_ROOT; - -import java.io.File; -import java.rmi.ConnectException; -import java.rmi.ConnectIOException; -import java.rmi.NotBoundException; -import java.rmi.RemoteException; -import java.util.Calendar; -import java.util.UUID; - -import javax.annotation.Nonnull; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import javax.xml.bind.JAXBException; - -import org.springframework.jmx.export.annotation.ManagedAttribute; -import org.springframework.jmx.export.annotation.ManagedResource; -import org.taverna.server.localworker.remote.RemoteRunFactory; -import org.taverna.server.localworker.remote.RemoteSingleRun; -import org.taverna.server.master.common.Workflow; -import org.taverna.server.master.exceptions.NoCreateException; -import org.taverna.server.master.factories.ConfigurableRunFactory; -import org.taverna.server.master.utils.UsernamePrincipal; - -/** - * A simple factory for workflow runs that forks runs from a subprocess. - * - * @author Donal Fellows - */ -@ManagedResource(objectName = JMX_ROOT + "RunFactory", description = "The factory for simple singleton forked run.") -public class ForkRunFactory extends AbstractRemoteRunFactory implements - ConfigurableRunFactory { - private int lastStartupCheckCount; - private Integer lastExitCode; - private RemoteRunFactory factory; - private Process factoryProcess; - private String factoryProcessName; - - /** - * Create a factory for remote runs that works by forking off a subprocess. - * - * @throws JAXBException - * Shouldn't happen. - */ - public ForkRunFactory() throws JAXBException { - } - - @PostConstruct - protected void initRegistry() { - log.info("waiting for availability of default RMI registry"); - getTheRegistry(); - } - - @Override - protected void reinitFactory() { - boolean makeFactory = factory != null; - killFactory(); - try { - if (makeFactory) - initFactory(); - } catch (Exception e) { - log.fatal("failed to make connection to remote run factory", e); - } - } - - private RemoteRunFactory getFactory() throws RemoteException { - try { - initFactory(); - } catch (RemoteException e) { - throw e; - } catch (Exception e) { - throw new RemoteException("problem constructing factory", e); - } - return factory; - } - - /** - * @return How many checks were done for the worker process the last time a - * spawn was tried. - */ - @ManagedAttribute(description = "How many checks were done for the worker process the last time a spawn was tried.", currencyTimeLimit = 60) - @Override - public int getLastStartupCheckCount() { - return lastStartupCheckCount; - } - - /** - * @return What was the exit code from the last time the factory subprocess - * was killed? - */ - @ManagedAttribute(description = "What was the exit code from the last time the factory subprocess was killed?") - @Override - public Integer getLastExitCode() { - return lastExitCode; - } - - /** - * @return What the factory subprocess's main RMI interface is registered - * as. - */ - @ManagedAttribute(description = "What the factory subprocess's main RMI interface is registered as.", currencyTimeLimit = 60) - @Override - public String getFactoryProcessName() { - return factoryProcessName; - } - - /** - * Makes the subprocess that manufactures runs. - * - * @throws Exception - * If anything goes wrong. - */ - public void initFactory() throws Exception { - if (factory != null) - return; - // Generate the arguments to use when spawning the subprocess - factoryProcessName = state.getFactoryProcessNamePrefix() + randomUUID(); - ProcessBuilder p = new ProcessBuilder(getJavaBinary()); - p.command().add("-jar"); - p.command().add(getServerWorkerJar()); - if (getExecuteWorkflowScript() == null) - log.fatal("no execute workflow script"); - p.command().add(getExecuteWorkflowScript()); - p.command().addAll(asList(getExtraArguments())); - p.command().add(factoryProcessName); - p.redirectErrorStream(true); - p.directory(new File(getProperty("javax.servlet.context.tempdir", - getProperty("java.io.tmpdir")))); - - // Spawn the subprocess - log.info("about to create subprocess: " + p.command()); - - factoryProcess = launchSubprocess(p); - outlog = new StreamLogger("FactoryStdout", factoryProcess.getInputStream()) { - @Override - protected void write(String msg) { - log.info(msg); - } - }; - errlog = new StreamLogger("FactoryStderr", factoryProcess.getErrorStream()) { - @Override - protected void write(String msg) { - log.info(msg); - } - }; - - // Wait for the subprocess to register itself in the RMI registry - Calendar deadline = Calendar.getInstance(); - deadline.add(SECOND, state.getWaitSeconds()); - Exception lastException = null; - lastStartupCheckCount = 0; - while (deadline.after(Calendar.getInstance())) { - try { - sleep(state.getSleepMS()); - lastStartupCheckCount++; - factory = getRemoteFactoryHandle(factoryProcessName); - initInteractionDetails(factory); - return; - } catch (InterruptedException ie) { - continue; - } catch (NotBoundException nbe) { - lastException = nbe; - log.info("resource \"" + factoryProcessName - + "\" not yet registered..."); - continue; - } catch (RemoteException re) { - // Unpack a remote exception if we can - lastException = re; - try { - if (re.getCause() != null) - lastException = (Exception) re.getCause(); - } catch (Throwable t) { - // Ignore! - } - } catch (RuntimeException e) { - lastException = e; - } - } - if (lastException == null) - lastException = new InterruptedException(); - throw lastException; - } - - private StreamLogger outlog, errlog; - - private void stopLoggers() { - if (outlog != null) - outlog.stop(); - outlog = null; - if (errlog != null) - errlog.stop(); - errlog = null; - } - - private RemoteRunFactory getRemoteFactoryHandle(String name) - throws RemoteException, NotBoundException { - log.info("about to look up resource called " + name); - try { - // Validate registry connection first - getTheRegistry().list(); - } catch (ConnectException | ConnectIOException e) { - log.warn("connection problems with registry", e); - } - RemoteRunFactory rrf = (RemoteRunFactory) getTheRegistry().lookup(name); - log.info("successfully connected to factory subprocess " - + factoryProcessName); - return rrf; - } - - /** - * Destroys the subprocess that manufactures runs. - */ - @PreDestroy - public void killFactory() { - if (factory != null) { - log.info("requesting shutdown of " + factoryProcessName); - try { - factory.shutdown(); - sleep(700); - } catch (RemoteException e) { - log.warn(factoryProcessName + " failed to shut down nicely", e); - } catch (InterruptedException e) { - if (log.isDebugEnabled()) - log.debug("interrupted during wait after asking " - + factoryProcessName + " to shut down", e); - } finally { - factory = null; - } - } - - if (factoryProcess != null) { - int code = -1; - try { - lastExitCode = code = factoryProcess.exitValue(); - log.info(factoryProcessName + " already dead?"); - } catch (RuntimeException e) { - log.info("trying to force death of " + factoryProcessName); - try { - factoryProcess.destroy(); - sleep(350); // takes a little time, even normally - lastExitCode = code = factoryProcess.exitValue(); - } catch (Exception e2) { - code = -1; - } - } finally { - factoryProcess = null; - stopLoggers(); - } - if (code > 128) { - log.info(factoryProcessName + " died with signal=" - + (code - 128)); - } else if (code >= 0) { - log.info(factoryProcessName + " process killed: code=" + code); - } else { - log.warn(factoryProcessName + " not yet dead"); - } - } - } - - /** - * The real core of the run builder, factored out from its reliability - * support. - * - * @param creator - * Who created this workflow? - * @param wf - * The serialized workflow. - * @return The remote handle of the workflow run. - * @throws RemoteException - * If anything fails (communications error, etc.) - */ - private RemoteSingleRun getRealRun(@Nonnull UsernamePrincipal creator, - @Nonnull byte[] wf, UUID id) throws RemoteException { - @Nonnull - String globaluser = "Unknown Person"; - if (creator != null) - globaluser = creator.getName(); - RemoteSingleRun rsr = getFactory().make(wf, globaluser, - makeURReciver(creator), id); - incrementRunCount(); - return rsr; - } - - @Override - protected RemoteSingleRun getRealRun(UsernamePrincipal creator, - Workflow workflow, UUID id) throws Exception { - @Nonnull - byte[] wf = serializeWorkflow(workflow); - for (int i = 0; i < 3; i++) { - initFactory(); - try { - return getRealRun(creator, wf, id); - } catch (ConnectException | ConnectIOException e) { - // factory was lost; try to recreate - } - killFactory(); - } - throw new NoCreateException("total failure to connect to factory " - + factoryProcessName + "despite attempting restart"); - } - - @Override - public String[] getFactoryProcessMapping() { - return new String[0]; - } - - @Override - protected int operatingCount() throws Exception { - return getFactory().countOperatingRuns(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/server-webapp/src/main/java/org/taverna/server/master/localworker/IdAwareForkRunFactory.java ---------------------------------------------------------------------- diff --git a/server-webapp/src/main/java/org/taverna/server/master/localworker/IdAwareForkRunFactory.java b/server-webapp/src/main/java/org/taverna/server/master/localworker/IdAwareForkRunFactory.java deleted file mode 100644 index e449373..0000000 --- a/server-webapp/src/main/java/org/taverna/server/master/localworker/IdAwareForkRunFactory.java +++ /dev/null @@ -1,516 +0,0 @@ -/* - * Copyright (C) 2010-2012 The University of Manchester - * - * See the file "LICENSE" for license terms. - */ -package org.taverna.server.master.localworker; - -import static java.lang.System.getProperty; -import static java.lang.Thread.sleep; -import static java.util.Arrays.asList; -import static java.util.Calendar.SECOND; -import static java.util.UUID.randomUUID; -import static org.taverna.server.master.TavernaServer.JMX_ROOT; -import static org.taverna.server.master.localworker.AbstractRemoteRunFactory.launchSubprocess; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.rmi.ConnectException; -import java.rmi.ConnectIOException; -import java.rmi.NotBoundException; -import java.rmi.RemoteException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Calendar; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import javax.annotation.Nonnull; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import javax.xml.bind.JAXBException; - -import org.apache.commons.logging.Log; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.core.annotation.Order; -import org.springframework.jmx.export.annotation.ManagedAttribute; -import org.springframework.jmx.export.annotation.ManagedResource; -import org.taverna.server.localworker.remote.RemoteRunFactory; -import org.taverna.server.localworker.remote.RemoteSingleRun; -import org.taverna.server.master.common.Workflow; -import org.taverna.server.master.exceptions.NoCreateException; -import org.taverna.server.master.factories.ConfigurableRunFactory; -import org.taverna.server.master.interfaces.LocalIdentityMapper; -import org.taverna.server.master.utils.UsernamePrincipal; - -/** - * A simple factory for workflow runs that forks runs from a subprocess. - * - * @author Donal Fellows - */ -@ManagedResource(objectName = JMX_ROOT + "RunFactory", description = "The factory for a user-specific forked run.") -public class IdAwareForkRunFactory extends AbstractRemoteRunFactory implements - ConfigurableRunFactory { - private MetaFactory forker; - private Map<String, RemoteRunFactory> factory; - private Map<String, String> factoryProcessName; - - /** - * Create a factory for remote runs that works by forking off a subprocess. - * - * @throws JAXBException - * Shouldn't happen. - */ - public IdAwareForkRunFactory() throws JAXBException { - factory = new HashMap<>(); - factoryProcessName = new HashMap<>(); - } - - @Override - protected void reinitFactory() { - boolean makeForker = forker != null; - try { - killForker(); - } catch (Exception e) { - log.warn("exception when killing secure-fork process", e); - } - try { - if (makeForker) - initMetaFactory(); - } catch (Exception e) { - log.fatal("failed to make secure-fork process", e); - } - } - - /** - * @return How many checks were done for the worker process the last time a - * spawn was tried. - */ - @Override - @ManagedAttribute(description = "How many checks were done for the worker process the last time a spawn was tried.", currencyTimeLimit = 60) - public int getLastStartupCheckCount() { - return forker == null ? 0 : forker.lastStartupCheckCount(); - } - - /** - * @return What was the exit code from the last time the factory subprocess - * was killed? - */ - @Override - @ManagedAttribute(description = "What was the exit code from the last time the factory subprocess was killed?") - public Integer getLastExitCode() { - return forker == null ? null : forker.lastExitCode(); - } - - /** - * @return The mapping of user names to RMI factory IDs. - */ - @Override - @ManagedAttribute(description = "The mapping of user names to RMI factory IDs.", currencyTimeLimit = 60) - public String[] getFactoryProcessMapping() { - ArrayList<String> result = new ArrayList<>(); - ArrayList<String> keys = new ArrayList<>(factoryProcessName.keySet()); - String[] ks = keys.toArray(new String[keys.size()]); - Arrays.sort(ks); - for (String k : ks) { - result.add(k); - result.add(factoryProcessName.get(k)); - } - return result.toArray(new String[result.size()]); - } - - /** - * How construction of factories is actually done. - * - * @author Donal Fellows - */ - public interface MetaFactory { - /** - * Make a factory for the given user. - * - * @param username - * Who to make it for. - * @return Handle of the factory. - * @throws Exception - * If anything goes wrong. - */ - RemoteRunFactory make(String username) throws Exception; - - /** - * Shut down the meta-factory. It is not defined whether factories - * created by it are also shut down at the same time. - * - * @throws IOException - * If something goes wrong when communicating with the - * meta-factory. - * @throws InterruptedException - * If something stops us waiting for the shut down to - * happen. - */ - void close() throws IOException, InterruptedException; - - int lastStartupCheckCount(); - - Integer lastExitCode(); - } - - void registerFactory(String username, String fpn, RemoteRunFactory f) { - factoryProcessName.put(username, fpn); - factory.put(username, f); - } - - /** - * Makes the connection to the meta-factory that makes factories. - * - * @throws IOException - * If the connection fails. - */ - @PostConstruct - void initMetaFactory() throws IOException { - log.info("waiting for availability of default RMI registry"); - getTheRegistry(); - log.info("constructing secure fork subprocess"); - forker = new SecureFork(this, state, log); - } - - private void killForker() throws IOException, InterruptedException { - try { - if (forker != null) - forker.close(); - } finally { - forker = null; - } - } - - /** - * Makes the subprocess that manufactures runs. - * - * @throws Exception - * If anything goes wrong. - */ - private void initFactory(String username) throws Exception { - if (factory.containsKey(username)) - return; - if (forker == null) - initMetaFactory(); - forker.make(username); - } - - /** - * Destroys the subprocess that manufactures runs. - */ - @PreDestroy - public void killFactories() { - if (!factory.isEmpty()) { - Iterator<String> keys = factory.keySet().iterator(); - while (keys.hasNext()) { - String key = keys.next(); - log.info("requesting shutdown of " - + factoryProcessName.get(key)); - try { - factory.get(key).shutdown(); - } catch (RemoteException e) { - log.warn(factoryProcessName.get(key) - + " failed to shut down nicely", e); - } finally { - keys.remove(); - factoryProcessName.remove(key); - } - } - try { - sleep(700); - } catch (InterruptedException e) { - if (log.isDebugEnabled()) - log.debug("interrupted during wait after " - + "asking factories to shut down", e); - } - } - - try { - killForker(); - } catch (Exception e) { - if (log.isDebugEnabled()) - log.debug("exception in shutdown of secure-fork process", e); - } - } - - @Override - protected void finalize() throws Throwable { - killFactories(); - super.finalize(); - } - - @Autowired - public void setIdMapper(LocalIdentityMapper mapper) { - this.mapper = mapper; - } - - private LocalIdentityMapper mapper; - - /** - * The real core of the run builder, factored out from its reliability - * support. - * - * @param creator - * Who created this workflow? - * @param username - * What user account is this workflow to be executed in? - * @param wf - * The serialized workflow. - * @return The remote handle of the workflow run. - * @throws RemoteException - * If anything fails (communications error, etc.) - */ - private RemoteSingleRun getRealRun(@Nonnull UsernamePrincipal creator, - @Nonnull String username, @Nonnull byte[] wf, UUID id) - throws RemoteException { - String globaluser = "Unknown Person"; - if (creator != null) - globaluser = creator.getName(); - RemoteSingleRun rsr = factory.get(username).make(wf, globaluser, - makeURReciver(creator), id); - incrementRunCount(); - return rsr; - } - - @Override - protected RemoteSingleRun getRealRun(UsernamePrincipal creator, - Workflow workflow, UUID id) throws Exception { - byte[] wf = serializeWorkflow(workflow); - String username = mapper == null ? null : mapper - .getUsernameForPrincipal(creator); - if (username == null) - throw new Exception("cannot determine who to run workflow as; " - + "local identity mapper returned null"); - for (int i = 0; i < 3; i++) { - if (!factory.containsKey(username)) - initFactory(username); - try { - return getRealRun(creator, username, wf, id); - } catch (ConnectException | ConnectIOException e) { - // factory was lost; try to recreate - } - factory.remove(username); - } - throw new NoCreateException("total failure to connect to factory " - + factoryProcessName + "despite attempting restart"); - } - - @Value("${secureForkPasswordFile}") - @Order(20) - public void setPasswordSource(String passwordSource) { - if (passwordSource == null || passwordSource.isEmpty() - || passwordSource.startsWith("${")) - state.setDefaultPasswordFile(null); - else - state.setDefaultPasswordFile(passwordSource); - if (state.getPasswordFile() == null) - log.info("assuming password-free forking enabled"); - else - log.info("configured secureForkPasswordFile from context as " - + state.getPasswordFile()); - } - - @Override - public String getFactoryProcessName() { - return "<PROPERTY-NOT-SUPPORTED>"; - } - - @Override - protected int operatingCount() throws Exception { - int total = 0; - for (RemoteRunFactory rrf : factory.values()) - total += rrf.countOperatingRuns(); - return total; - } -} - -/** - * The connector that handles the secure fork process itself. - * - * @author Donal Fellows - */ -class SecureFork implements IdAwareForkRunFactory.MetaFactory { - private IdAwareForkRunFactory main; - private Process process; - private PrintWriter channel; - private int lastStartupCheckCount; - private Integer lastExitCode; - private Log log; - private LocalWorkerState state; - private StreamLogger out, err; - - /** - * Construct the command to run the meta-factory process. - * - * @param args - * The live list of arguments to pass. - */ - public void initFactoryArgs(List<String> args) { - args.add(main.getJavaBinary()); - String pwf = main.getPasswordFile(); - if (pwf != null) { - args.add("-Dpassword.file=" + pwf); - } - args.add("-jar"); - args.add(main.getServerForkerJar()); - args.add(main.getJavaBinary()); - args.add("-jar"); - args.add(main.getServerWorkerJar()); - if (main.getExecuteWorkflowScript() == null) - log.fatal("no execute workflow script"); - args.add(main.getExecuteWorkflowScript()); - args.addAll(asList(main.getExtraArguments())); - } - - SecureFork(IdAwareForkRunFactory main, LocalWorkerState state, Log log) - throws IOException { - this.main = main; - this.log = log; - this.state = state; - ProcessBuilder p = new ProcessBuilder(); - initFactoryArgs(p.command()); - p.redirectErrorStream(true); - p.directory(new File(getProperty("javax.servlet.context.tempdir", - getProperty("java.io.tmpdir")))); - - // Spawn the subprocess - log.info("about to create subprocess: " + p.command()); - log.info("subprocess directory: " + p.directory()); - process = launchSubprocess(p); - channel = new PrintWriter(new BufferedWriter(new OutputStreamWriter( - process.getOutputStream())), true); - // Log the responses - out = new StreamLogger("ForkedStdout", process.getInputStream()) { - @Override - protected void write(String msg) { - log.info(msg); - } - }; - err = new StreamLogger("ForkedStderr", process.getErrorStream()) { - @Override - protected void write(String msg) { - log.info(msg); - } - }; - } - - @Override - public void close() throws IOException, InterruptedException { - try { - if (process != null) { - log.info("about to close down subprocess"); - channel.close(); - int code = -1; - try { - try { - code = process.exitValue(); - log.info("secure-fork process already dead?"); - } catch (IllegalThreadStateException e) { - try { - code = process.waitFor(); - } catch (InterruptedException e1) { - log.info("interrupted waiting for natural death of secure-fork process?!"); - process.destroy(); - code = process.waitFor(); - } - } - } finally { - lastExitCode = code; - if (code > 128) { - log.info("secure-fork process died with signal=" - + (code - 128)); - } else if (code >= 0) { - log.info("secure-fork process killed: code=" + code); - } else { - log.warn("secure-fork process not yet dead"); - } - } - } - } finally { - process = null; - channel = null; - out.stop(); - err.stop(); - } - } - - protected void make(String username, String fpn) { - log.info("about to request subprocess creation for " + username - + " producing ID " + fpn); - channel.println(username + " " + fpn); - } - - @Override - public RemoteRunFactory make(String username) throws Exception { - try { - main.getTheRegistry().list(); // Validate registry connection first - } catch (ConnectException | ConnectIOException e) { - log.warn("connection problems with registry", e); - } catch (RemoteException e) { - if (e.getCause() != null && e.getCause() instanceof Exception) { - throw (Exception) e.getCause(); - } - log.warn("connection problems with registry", e); - } - - String fpn = state.getFactoryProcessNamePrefix() + randomUUID(); - make(username, fpn); - - // Wait for the subprocess to register itself in the RMI registry - Calendar deadline = Calendar.getInstance(); - deadline.add(SECOND, state.getWaitSeconds()); - Exception lastException = null; - lastStartupCheckCount = 0; - while (deadline.after(Calendar.getInstance())) { - try { - sleep(state.getSleepMS()); - lastStartupCheckCount++; - log.info("about to look up resource called " + fpn); - RemoteRunFactory f = (RemoteRunFactory) main.getTheRegistry() - .lookup(fpn); - log.info("successfully connected to factory subprocess " + fpn); - main.initInteractionDetails(f); - main.registerFactory(username, fpn, f); - return f; - } catch (InterruptedException ie) { - continue; - } catch (NotBoundException nbe) { - lastException = nbe; - log.info("resource \"" + fpn + "\" not yet registered..."); - continue; - } catch (RemoteException re) { - // Unpack a remote exception if we can - lastException = re; - try { - if (re.getCause() != null) - lastException = (Exception) re.getCause(); - } catch (Throwable t) { - // Ignore! - } - } catch (Exception e) { - lastException = e; - } - } - if (lastException == null) - lastException = new InterruptedException(); - throw lastException; - } - - @Override - public Integer lastExitCode() { - return lastExitCode; - } - - @Override - public int lastStartupCheckCount() { - return lastStartupCheckCount; - } -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/server-webapp/src/main/java/org/taverna/server/master/localworker/LocalWorkerFactory.java ---------------------------------------------------------------------- diff --git a/server-webapp/src/main/java/org/taverna/server/master/localworker/LocalWorkerFactory.java b/server-webapp/src/main/java/org/taverna/server/master/localworker/LocalWorkerFactory.java deleted file mode 100644 index f890204..0000000 --- a/server-webapp/src/main/java/org/taverna/server/master/localworker/LocalWorkerFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright (C) 2011 The University of Manchester - * - * See the file "LICENSE" for license terms. - */ -package org.taverna.server.master.localworker; - -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * Provider of the configuration of the "localworker.factory" bean, which is - * sufficiently complex to be too hard to manufacture directly from the XML - * configuration. - * - * @author Donal Fellows - */ -@Configuration -public class LocalWorkerFactory { - @Bean(name = "localworker.factory") - AbstractRemoteRunFactory getLocalworkerFactory( - @Value("${backEndFactory}") String mode) throws Exception { - if (mode == null || mode.isEmpty() || mode.startsWith("${")) - throw new Exception("no value for ${backEndFactory}"); - Class<?> c = Class.forName(mode); - if (AbstractRemoteRunFactory.class.isAssignableFrom(c)) - return (AbstractRemoteRunFactory) c.newInstance(); - throw new Exception("unknown remote run factory: " + mode); - } -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/server-webapp/src/main/java/org/taverna/server/master/localworker/LocalWorkerState.java ---------------------------------------------------------------------- diff --git a/server-webapp/src/main/java/org/taverna/server/master/localworker/LocalWorkerState.java b/server-webapp/src/main/java/org/taverna/server/master/localworker/LocalWorkerState.java deleted file mode 100644 index 9f0f39b..0000000 --- a/server-webapp/src/main/java/org/taverna/server/master/localworker/LocalWorkerState.java +++ /dev/null @@ -1,454 +0,0 @@ -/* - * Copyright (C) 2010-2013 The University of Manchester - * - * See the file "LICENSE" for license terms. - */ -package org.taverna.server.master.localworker; - -import static java.io.File.separator; -import static java.lang.System.getProperty; -import static java.rmi.registry.Registry.REGISTRY_PORT; -import static java.util.Arrays.asList; -import static java.util.Collections.emptyList; -import static java.util.Collections.unmodifiableList; -import static org.taverna.server.master.defaults.Default.EXTRA_ARGUMENTS; -import static org.taverna.server.master.defaults.Default.PASSWORD_FILE; -import static org.taverna.server.master.defaults.Default.REGISTRY_JAR; -import static org.taverna.server.master.defaults.Default.RMI_PREFIX; -import static org.taverna.server.master.defaults.Default.RUN_LIFE_MINUTES; -import static org.taverna.server.master.defaults.Default.RUN_OPERATING_LIMIT; -import static org.taverna.server.master.defaults.Default.SECURE_FORK_IMPLEMENTATION_JAR; -import static org.taverna.server.master.defaults.Default.SERVER_WORKER_IMPLEMENTATION_JAR; -import static org.taverna.server.master.defaults.Default.SUBPROCESS_START_POLL_SLEEP; -import static org.taverna.server.master.defaults.Default.SUBPROCESS_START_WAIT; -import static org.taverna.server.master.localworker.PersistedState.KEY; -import static org.taverna.server.master.localworker.PersistedState.makeInstance; - -import java.io.File; -import java.io.FilenameFilter; -import java.net.URI; -import java.util.List; - -import javax.annotation.PostConstruct; -import javax.jdo.annotations.PersistenceAware; - -import org.springframework.beans.factory.annotation.Required; -import org.taverna.server.master.common.Status; -import org.taverna.server.master.defaults.Default; -import org.taverna.server.master.utils.JDOSupport; -import org.taverna.server.master.worker.WorkerModel; - -/** - * The persistent state of a local worker factory. - * - * @author Donal Fellows - */ -@PersistenceAware -public class LocalWorkerState extends JDOSupport<PersistedState> implements - WorkerModel { - public LocalWorkerState() { - super(PersistedState.class); - } - - private LocalWorkerState self; - - @Required - public void setSelf(LocalWorkerState self) { - this.self = self; - } - - /** Initial lifetime of runs, in minutes. */ - int defaultLifetime; - /** - * Maximum number of runs to exist at once. Note that this includes when - * they are just existing for the purposes of file transfer ( - * {@link Status#Initialized}/{@link Status#Finished} states). - */ - int maxRuns; - /** - * Prefix to use for RMI names. - */ - String factoryProcessNamePrefix; - /** - * Full path name of the script used to start running a workflow; normally - * expected to be "<i>somewhere/</i><tt>executeWorkflow.sh</tt>". - */ - String executeWorkflowScript; - /** Default value for {@link #executeWorkflowScript}. */ - private transient String defaultExecuteWorkflowScript; - /** - * Full path name of the file containing the password used to launch workers - * as other users. The file is normally expected to contain a single line, - * the password, and to be thoroughly locked down so only the user running - * the server (e.g., "<tt>tomcat</tt>") can read it; it will probably reside - * in either the user's home directory or in a system configuration - * directory. - */ - String passwordFile; - /** Default value for {@link #passwordFile}. */ - private transient String defaultPasswordFile = PASSWORD_FILE; - /** - * The extra arguments to pass to the subprocess. - */ - String[] extraArgs; - /** - * How long to wait for subprocess startup, in seconds. - */ - int waitSeconds; - /** - * Polling interval to use during startup, in milliseconds. - */ - int sleepMS; - /** - * Full path name to the worker process's implementation JAR. - */ - String serverWorkerJar; - private static final String DEFAULT_WORKER_JAR = LocalWorkerState.class - .getClassLoader().getResource(SERVER_WORKER_IMPLEMENTATION_JAR) - .getFile(); - /** - * Full path name to the Java binary to use to run the subprocess. - */ - String javaBinary; - private static final String DEFAULT_JAVA_BINARY = getProperty("java.home") - + separator + "bin" + separator + "java"; - /** - * Full path name to the secure fork process's implementation JAR. - */ - String serverForkerJar; - private static final String DEFAULT_FORKER_JAR = LocalWorkerState.class - .getClassLoader().getResource(SECURE_FORK_IMPLEMENTATION_JAR) - .getFile(); - - String registryHost; - int registryPort; - - int operatingLimit; - - URI[] permittedWorkflows; - private String registryJar; - private static final String DEFAULT_REGISTRY_JAR = LocalWorkerState.class - .getClassLoader().getResource(REGISTRY_JAR).getFile(); - - @Override - public void setDefaultLifetime(int defaultLifetime) { - this.defaultLifetime = defaultLifetime; - if (loadedState) - self.store(); - } - - @Override - public int getDefaultLifetime() { - return defaultLifetime < 1 ? RUN_LIFE_MINUTES : defaultLifetime; - } - - @Override - public void setMaxRuns(int maxRuns) { - this.maxRuns = maxRuns; - if (loadedState) - self.store(); - } - - @Override - public int getMaxRuns() { - return maxRuns < 1 ? Default.RUN_COUNT_MAX : maxRuns; - } - - @Override - public int getOperatingLimit() { - return operatingLimit < 1 ? RUN_OPERATING_LIMIT : operatingLimit; - } - - @Override - public void setOperatingLimit(int operatingLimit) { - this.operatingLimit = operatingLimit; - if (loadedState) - self.store(); - } - - @Override - public void setFactoryProcessNamePrefix(String factoryProcessNamePrefix) { - this.factoryProcessNamePrefix = factoryProcessNamePrefix; - if (loadedState) - self.store(); - } - - @Override - public String getFactoryProcessNamePrefix() { - return factoryProcessNamePrefix == null ? RMI_PREFIX - : factoryProcessNamePrefix; - } - - @Override - public void setExecuteWorkflowScript(String executeWorkflowScript) { - this.executeWorkflowScript = executeWorkflowScript; - if (loadedState) - self.store(); - } - - @Override - public String getExecuteWorkflowScript() { - return executeWorkflowScript == null ? defaultExecuteWorkflowScript - : executeWorkflowScript; - } - - private static String guessWorkflowScript() { - File utilDir = new File(DEFAULT_WORKER_JAR).getParentFile(); - File[] dirs = utilDir.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.startsWith("taverna-commandline-"); - } - }); - assert dirs.length > 0; - return new File(dirs[0], "executeworkflow.sh").toString(); - } - - /** - * Set what executeworkflow script to use by default. This is the value that - * is used if not overridden by the administration interface. - * - * @param defaultScript - * Full path to the script to use. - */ - public void setDefaultExecuteWorkflowScript(String defaultScript) { - if (defaultScript.startsWith("${") || defaultScript.equals("NONE")) { - this.defaultExecuteWorkflowScript = guessWorkflowScript(); - return; - } - this.defaultExecuteWorkflowScript = defaultScript; - } - - String getDefaultExecuteWorkflowScript() { - return defaultExecuteWorkflowScript; - } - - @Override - public void setExtraArgs(String[] extraArgs) { - this.extraArgs = extraArgs.clone(); - if (loadedState) - self.store(); - } - - @Override - public String[] getExtraArgs() { - return extraArgs == null ? EXTRA_ARGUMENTS : extraArgs.clone(); - } - - @Override - public void setWaitSeconds(int waitSeconds) { - this.waitSeconds = waitSeconds; - if (loadedState) - self.store(); - } - - @Override - public int getWaitSeconds() { - return waitSeconds < 1 ? SUBPROCESS_START_WAIT : waitSeconds; - } - - @Override - public void setSleepMS(int sleepMS) { - this.sleepMS = sleepMS; - if (loadedState) - self.store(); - } - - @Override - public int getSleepMS() { - return sleepMS < 1 ? SUBPROCESS_START_POLL_SLEEP : sleepMS; - } - - @Override - public void setServerWorkerJar(String serverWorkerJar) { - this.serverWorkerJar = serverWorkerJar; - if (loadedState) - self.store(); - } - - @Override - public String getServerWorkerJar() { - return serverWorkerJar == null ? DEFAULT_WORKER_JAR : serverWorkerJar; - } - - @Override - public void setServerForkerJar(String serverForkerJar) { - this.serverForkerJar = serverForkerJar; - if (loadedState) - self.store(); - } - - @Override - public String getServerForkerJar() { - return serverForkerJar == null ? DEFAULT_FORKER_JAR : serverForkerJar; - } - - @Override - public void setJavaBinary(String javaBinary) { - this.javaBinary = javaBinary; - if (loadedState) - self.store(); - } - - @Override - public String getJavaBinary() { - return javaBinary == null ? DEFAULT_JAVA_BINARY : javaBinary; - } - - @Override - public void setPasswordFile(String passwordFile) { - this.passwordFile = passwordFile; - if (loadedState) - self.store(); - } - - @Override - public String getPasswordFile() { - return passwordFile == null ? defaultPasswordFile : passwordFile; - } - - void setDefaultPasswordFile(String defaultPasswordFile) { - this.defaultPasswordFile = defaultPasswordFile; - } - - @Override - public void setRegistryHost(String registryHost) { - this.registryHost = (registryHost == null ? "" : registryHost); - if (loadedState) - self.store(); - } - - @Override - public String getRegistryHost() { - return (registryHost == null || registryHost.isEmpty()) ? null - : registryHost; - } - - @Override - public void setRegistryPort(int registryPort) { - this.registryPort = ((registryPort < 1 || registryPort > 65534) ? REGISTRY_PORT - : registryPort); - if (loadedState) - self.store(); - } - - @Override - public int getRegistryPort() { - return registryPort == 0 ? REGISTRY_PORT : registryPort; - } - - @Override - public String getRegistryJar() { - return registryJar == null ? DEFAULT_REGISTRY_JAR : registryJar; - } - - @Override - public void setRegistryJar(String rmiRegistryJar) { - this.registryJar = (rmiRegistryJar == null || rmiRegistryJar.isEmpty()) ? null - : rmiRegistryJar; - if (loadedState) - self.store(); - } - - @Override - public List<URI> getPermittedWorkflowURIs() { - if (permittedWorkflows == null || permittedWorkflows.length == 0) - return emptyList(); - return unmodifiableList(asList(permittedWorkflows)); - } - - @Override - public void setPermittedWorkflowURIs(List<URI> permittedWorkflows) { - if (permittedWorkflows == null || permittedWorkflows.isEmpty()) - this.permittedWorkflows = new URI[0]; - else - this.permittedWorkflows = permittedWorkflows - .toArray(new URI[permittedWorkflows.size()]); - if (loadedState) - self.store(); - } - - public static final boolean DEFAULT_GENERATE_PROVENANCE = false; - private Boolean generateProvenance; - - @Override - public boolean getGenerateProvenance() { - Boolean g = generateProvenance; - return g == null ? DEFAULT_GENERATE_PROVENANCE : (boolean) g; - } - - @Override - public void setGenerateProvenance(boolean generate) { - this.generateProvenance = generate; - if (loadedState) - self.store(); - } - - // -------------------------------------------------------------- - - private boolean loadedState; - - @PostConstruct - @WithinSingleTransaction - public void load() { - if (loadedState || !isPersistent()) - return; - WorkerModel state = getById(KEY); - if (state == null) { - store(); - return; - } - - defaultLifetime = state.getDefaultLifetime(); - executeWorkflowScript = state.getExecuteWorkflowScript(); - extraArgs = state.getExtraArgs(); - factoryProcessNamePrefix = state.getFactoryProcessNamePrefix(); - javaBinary = state.getJavaBinary(); - maxRuns = state.getMaxRuns(); - serverWorkerJar = state.getServerWorkerJar(); - serverForkerJar = state.getServerForkerJar(); - passwordFile = state.getPasswordFile(); - sleepMS = state.getSleepMS(); - waitSeconds = state.getWaitSeconds(); - registryHost = state.getRegistryHost(); - registryPort = state.getRegistryPort(); - operatingLimit = state.getOperatingLimit(); - List<URI> pwu = state.getPermittedWorkflowURIs(); - permittedWorkflows = (URI[]) pwu.toArray(new URI[pwu.size()]); - registryJar = state.getRegistryJar(); - generateProvenance = state.getGenerateProvenance(); - - loadedState = true; - } - - @WithinSingleTransaction - public void store() { - if (!isPersistent()) - return; - WorkerModel state = getById(KEY); - if (state == null) - state = persist(makeInstance()); - - state.setDefaultLifetime(defaultLifetime); - state.setExecuteWorkflowScript(executeWorkflowScript); - state.setExtraArgs(extraArgs); - state.setFactoryProcessNamePrefix(factoryProcessNamePrefix); - state.setJavaBinary(javaBinary); - state.setMaxRuns(maxRuns); - state.setServerWorkerJar(serverWorkerJar); - state.setServerForkerJar(serverForkerJar); - state.setPasswordFile(passwordFile); - state.setSleepMS(sleepMS); - state.setWaitSeconds(waitSeconds); - state.setRegistryHost(registryHost); - state.setRegistryPort(registryPort); - state.setOperatingLimit(operatingLimit); - if (permittedWorkflows != null) - state.setPermittedWorkflowURIs(asList(permittedWorkflows)); - state.setRegistryJar(registryJar); - if (generateProvenance != null) - state.setGenerateProvenance(generateProvenance); - - loadedState = true; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/server-webapp/src/main/java/org/taverna/server/master/localworker/PersistedState.java ---------------------------------------------------------------------- diff --git a/server-webapp/src/main/java/org/taverna/server/master/localworker/PersistedState.java b/server-webapp/src/main/java/org/taverna/server/master/localworker/PersistedState.java deleted file mode 100644 index 83d6bda..0000000 --- a/server-webapp/src/main/java/org/taverna/server/master/localworker/PersistedState.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Copyright (C) 2010-2013 The University of Manchester - * - * See the file "LICENSE" for license terms. - */ -package org.taverna.server.master.localworker; - -import java.net.URI; -import java.util.ArrayList; -import java.util.List; - -import javax.jdo.annotations.Join; -import javax.jdo.annotations.PersistenceCapable; -import javax.jdo.annotations.Persistent; -import javax.jdo.annotations.PrimaryKey; - -import org.taverna.server.master.worker.WorkerModel; - -/** - * The actual database connector for persisted local worker state. - * - * @author Donal Fellows - */ -/* - * WARNING! If you change the name of this class, update persistence.xml as - * well! - */ -@PersistenceCapable(table = PersistedState.TABLE) -class PersistedState implements WorkerModel { - static final String TABLE = "LOCALWORKERSTATE__PERSISTEDSTATE"; - - static PersistedState makeInstance() { - PersistedState o = new PersistedState(); - o.ID = KEY; - return o; - } - - @PrimaryKey(column = "ID") - protected int ID; - - static final int KEY = 32; - - @Persistent - private int defaultLifetime; - @Persistent - private int maxRuns; - @Persistent - private String factoryProcessNamePrefix; - @Persistent - private String executeWorkflowScript; - @Persistent(serialized = "true") - private String[] extraArgs; - @Persistent - private int waitSeconds; - @Persistent - private int sleepMS; - @Persistent - private String serverWorkerJar; - @Persistent - private String serverForkerJar; - @Persistent - private String registryJar; - @Persistent - private String passwordFile; - @Persistent - private String javaBinary; - @Persistent - private int registryPort; - @Persistent - private String registryHost; - @Persistent - private int operatingLimit; - @Persistent(defaultFetchGroup = "true") - @Join(table = TABLE + "_PERMWFURI", column = "ID") - private String[] permittedWorkflows; - @Persistent - private int generateProvenance; - - @Override - public void setDefaultLifetime(int defaultLifetime) { - this.defaultLifetime = defaultLifetime; - } - - @Override - public int getDefaultLifetime() { - return defaultLifetime; - } - - @Override - public void setMaxRuns(int maxRuns) { - this.maxRuns = maxRuns; - } - - @Override - public int getMaxRuns() { - return maxRuns; - } - - @Override - public void setFactoryProcessNamePrefix(String factoryProcessNamePrefix) { - this.factoryProcessNamePrefix = factoryProcessNamePrefix; - } - - @Override - public String getFactoryProcessNamePrefix() { - return factoryProcessNamePrefix; - } - - @Override - public void setExecuteWorkflowScript(String executeWorkflowScript) { - this.executeWorkflowScript = executeWorkflowScript; - } - - @Override - public String getExecuteWorkflowScript() { - return executeWorkflowScript; - } - - @Override - public void setExtraArgs(String[] extraArgs) { - this.extraArgs = extraArgs; - } - - @Override - public String[] getExtraArgs() { - return extraArgs; - } - - @Override - public void setWaitSeconds(int waitSeconds) { - this.waitSeconds = waitSeconds; - } - - @Override - public int getWaitSeconds() { - return waitSeconds; - } - - @Override - public void setSleepMS(int sleepMS) { - this.sleepMS = sleepMS; - } - - @Override - public int getSleepMS() { - return sleepMS; - } - - @Override - public void setServerWorkerJar(String serverWorkerJar) { - this.serverWorkerJar = serverWorkerJar; - } - - @Override - public String getServerWorkerJar() { - return serverWorkerJar; - } - - @Override - public void setJavaBinary(String javaBinary) { - this.javaBinary = javaBinary; - } - - @Override - public String getJavaBinary() { - return javaBinary; - } - - @Override - public void setRegistryPort(int registryPort) { - this.registryPort = registryPort; - } - - @Override - public int getRegistryPort() { - return registryPort; - } - - @Override - public void setRegistryHost(String registryHost) { - this.registryHost = registryHost; - } - - @Override - public String getRegistryHost() { - return registryHost; - } - - @Override - public void setServerForkerJar(String serverForkerJar) { - this.serverForkerJar = serverForkerJar; - } - - @Override - public String getServerForkerJar() { - return serverForkerJar; - } - - @Override - public void setPasswordFile(String passwordFile) { - this.passwordFile = passwordFile; - } - - @Override - public String getPasswordFile() { - return passwordFile; - } - - @Override - public void setOperatingLimit(int operatingLimit) { - this.operatingLimit = operatingLimit; - } - - @Override - public int getOperatingLimit() { - return operatingLimit; - } - - @Override - public List<URI> getPermittedWorkflowURIs() { - String[] pw = this.permittedWorkflows; - if (pw == null) - return new ArrayList<>(); - List<URI> uris = new ArrayList<>(pw.length); - for (String uri : pw) - uris.add(URI.create(uri)); - return uris; - } - - @Override - public void setPermittedWorkflowURIs(List<URI> permittedWorkflows) { - String[] pw = new String[permittedWorkflows.size()]; - for (int i = 0; i < pw.length; i++) - pw[i] = permittedWorkflows.get(i).toString(); - this.permittedWorkflows = pw; - } - - @Override - public String getRegistryJar() { - return registryJar; - } - - @Override - public void setRegistryJar(String registryJar) { - this.registryJar = registryJar; - } - - @Override - public boolean getGenerateProvenance() { - return generateProvenance > 0; - } - - @Override - public void setGenerateProvenance(boolean generateProvenance) { - this.generateProvenance = (generateProvenance ? 1 : 0); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/server-webapp/src/main/java/org/taverna/server/master/localworker/StreamLogger.java ---------------------------------------------------------------------- diff --git a/server-webapp/src/main/java/org/taverna/server/master/localworker/StreamLogger.java b/server-webapp/src/main/java/org/taverna/server/master/localworker/StreamLogger.java deleted file mode 100644 index f361e17..0000000 --- a/server-webapp/src/main/java/org/taverna/server/master/localworker/StreamLogger.java +++ /dev/null @@ -1,62 +0,0 @@ -package org.taverna.server.master.localworker; - -import static java.lang.Thread.interrupted; -import static org.apache.commons.logging.LogFactory.getLog; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; - -import org.apache.commons.logging.Log; - -abstract class StreamLogger { - protected final Log log; - private Thread t; - private InputStream in; - - protected StreamLogger(final String name, InputStream is) { - log = getLog("Taverna.Server.LocalWorker." + name); - in = is; - t = new Thread(new Runnable() { - @Override - public void run() { - try (BufferedReader br = new BufferedReader( - new InputStreamReader(in))) { - String line; - while (!interrupted() && (line = br.readLine()) != null) - if (!line.isEmpty()) - write(line); - } catch (IOException e) { - // Do nothing... - } catch (Exception e) { - log.warn("failure in reading from " + name, e); - } - } - }, name + ".StreamLogger"); - t.setContextClassLoader(null); - t.setDaemon(true); - t.start(); - } - - /** - * Write a line read from the subprocess to the log. - * <p> - * This needs to be implemented by subclasses in order for the log to be - * correctly written with the class name. - * - * @param msg - * The message to write. Guaranteed to have no newline characters - * in it and to be non-empty. - */ - protected abstract void write(String msg); - - public void stop() { - log.info("trying to close down " + t.getName()); - t.interrupt(); - try { - in.close(); - } catch (IOException e) { - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/server-webapp/src/main/java/org/taverna/server/master/localworker/package-info.java ---------------------------------------------------------------------- diff --git a/server-webapp/src/main/java/org/taverna/server/master/localworker/package-info.java b/server-webapp/src/main/java/org/taverna/server/master/localworker/package-info.java deleted file mode 100644 index 7139dd7..0000000 --- a/server-webapp/src/main/java/org/taverna/server/master/localworker/package-info.java +++ /dev/null @@ -1,10 +0,0 @@ -/* - * Copyright (C) 2011 The University of Manchester - * - * See the file "LICENSE" for license terms. - */ -/** - * Implementation of a Taverna Server back-end that works by forking off - * workflow executors on the local system. - */ -package org.taverna.server.master.localworker; http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/server-webapp/src/main/java/org/taverna/server/master/notification/EmailDispatcher.java ---------------------------------------------------------------------- diff --git a/server-webapp/src/main/java/org/taverna/server/master/notification/EmailDispatcher.java b/server-webapp/src/main/java/org/taverna/server/master/notification/EmailDispatcher.java deleted file mode 100644 index 3e27806..0000000 --- a/server-webapp/src/main/java/org/taverna/server/master/notification/EmailDispatcher.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright (C) 2010-2011 The University of Manchester - * - * See the file "LICENSE" for license terms. - */ -package org.taverna.server.master.notification; - -import static javax.ws.rs.core.MediaType.TEXT_PLAIN; - -import javax.annotation.PostConstruct; - -import org.springframework.beans.factory.annotation.Required; -import org.springframework.mail.MailSender; -import org.springframework.mail.SimpleMailMessage; -import org.springframework.mail.javamail.JavaMailSender; - -/** - * How to send a plain text message by email to someone. - * - * @author Donal Fellows - */ -public class EmailDispatcher extends RateLimitedDispatcher { - @Override - public String getName() { - return "mailto"; - } - - /** - * @param from - * Email address that the notification is to come from. - */ - @Required - public void setFrom(String from) { - this.from = valid(from, ""); - } - - /** - * @param host - * The outgoing SMTP server address. - */ - @Required - public void setSmtpHost(String host) { - this.host = valid(host, ""); - } - - /** - * @param contentType - * The content type of the message to be sent. For example, " - * <tt>text/plain</tt>". - */ - public void setMessageContentType(String contentType) { - this.contentType = contentType; - } - - /** - * @param sender - * the sender to set - */ - public void setSender(MailSender sender) { - this.sender = sender; - } - - private String from; - private String host; - private MailSender sender; - @SuppressWarnings("unused") - private String contentType = TEXT_PLAIN; - - /** - * Try to perform the lookup of the email service. This is called during - * configuration so that any failure happens at a useful, predictable time. - */ - @PostConstruct - public void tryLookup() { - if (!isAvailable()) { - log.warn("no mail support; disabling email dispatch"); - sender = null; - return; - } - try { - if (sender instanceof JavaMailSender) - ((JavaMailSender) sender).createMimeMessage(); - } catch (Throwable t) { - log.warn("sender having problems constructing messages; " - + "disabling...", t); - sender = null; - } - } - - @Override - public void dispatch(String messageSubject, String messageContent, String to) - throws Exception { - // Simple checks for acceptability - if (!to.matches(".+@.+")) { - log.info("did not send email notification: improper email address \"" - + to + "\""); - return; - } - - SimpleMailMessage message = new SimpleMailMessage(); - message.setFrom(from); - message.setTo(to.trim()); - message.setSubject(messageSubject); - message.setText(messageContent); - sender.send(message); - } - - @Override - public boolean isAvailable() { - return (host != null && !host.isEmpty() && sender != null - && from != null && !from.isEmpty()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/server-webapp/src/main/java/org/taverna/server/master/notification/JabberDispatcher.java ---------------------------------------------------------------------- diff --git a/server-webapp/src/main/java/org/taverna/server/master/notification/JabberDispatcher.java b/server-webapp/src/main/java/org/taverna/server/master/notification/JabberDispatcher.java deleted file mode 100644 index 61640bf..0000000 --- a/server-webapp/src/main/java/org/taverna/server/master/notification/JabberDispatcher.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Copyright (C) 2011 The University of Manchester - * - * See the file "LICENSE" for license terms. - */ - -package org.taverna.server.master.notification; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.jivesoftware.smack.Chat; -import org.jivesoftware.smack.ConnectionConfiguration; -import org.jivesoftware.smack.MessageListener; -import org.jivesoftware.smack.XMPPConnection; -import org.jivesoftware.smack.packet.Message; -import org.taverna.server.master.interfaces.MessageDispatcher; -import org.taverna.server.master.interfaces.TavernaRun; - -/** - * Send notifications by Jabber/XMPP. - * - * @author Donal Fellows - */ -public class JabberDispatcher implements MessageDispatcher { - @Override - public String getName() { - return "xmpp"; - } - - private Log log = LogFactory.getLog("Taverna.Server.Notification"); - private XMPPConnection conn; - private String resource = "TavernaServer"; - private String host = ""; - private String user = ""; - private String pass = ""; - - /** - * @param resource - * The XMPP resource to use when connecting the server. This - * defaults to "<tt>TavernaServer</tt>". - */ - public void setResource(String resource) { - this.resource = resource; - } - - /** - * @param service - * The XMPP service URL. - */ - public void setHost(String service) { - if (service == null || service.trim().isEmpty() - || service.trim().startsWith("$")) - this.host = ""; - else - this.host = service.trim(); - } - - /** - * @param user - * The user identity to use with the XMPP service. - */ - public void setUsername(String user) { - if (user == null || user.trim().isEmpty() - || user.trim().startsWith("$")) - this.user = ""; - else - this.user = user.trim(); - } - - /** - * @param pass - * The password to use with the XMPP service. - */ - public void setPassword(String pass) { - if (pass == null || pass.trim().isEmpty() - || pass.trim().startsWith("$")) - this.pass = ""; - else - this.pass = pass.trim(); - } - - @PostConstruct - void setup() { - try { - if (host.isEmpty() || user.isEmpty() || pass.isEmpty()) { - log.info("disabling XMPP support; incomplete configuration"); - conn = null; - return; - } - ConnectionConfiguration cfg = new ConnectionConfiguration(host); - cfg.setSendPresence(false); - XMPPConnection c = new XMPPConnection(cfg); - c.connect(); - c.login(user, pass, resource); - conn = c; - log.info("connected to XMPP service <" + host + "> as user <" - + user + ">"); - } catch (Exception e) { - log.info("failed to connect to XMPP server", e); - } - } - - @PreDestroy - public void close() { - if (conn != null) - conn.disconnect(); - conn = null; - } - - @Override - public boolean isAvailable() { - return conn != null; - } - - @Override - public void dispatch(TavernaRun ignored, String messageSubject, - String messageContent, String targetParameter) throws Exception { - Chat chat = conn.getChatManager().createChat(targetParameter, - new DroppingListener()); - Message m = new Message(); - m.addBody(null, messageContent); - m.setSubject(messageSubject); - chat.sendMessage(m); - } - - static class DroppingListener implements MessageListener { - private Log log = LogFactory - .getLog("Taverna.Server.Notification.Jabber"); - - @Override - public void processMessage(Chat chat, Message message) { - if (log.isDebugEnabled()) - log.debug("unexpectedly received XMPP message from <" - + message.getFrom() + ">; ignoring"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/server-webapp/src/main/java/org/taverna/server/master/notification/NotificationEngine.java ---------------------------------------------------------------------- diff --git a/server-webapp/src/main/java/org/taverna/server/master/notification/NotificationEngine.java b/server-webapp/src/main/java/org/taverna/server/master/notification/NotificationEngine.java deleted file mode 100644 index bc0f60d..0000000 --- a/server-webapp/src/main/java/org/taverna/server/master/notification/NotificationEngine.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Copyright (C) 2010-2011 The University of Manchester - * - * See the file "LICENSE" for license terms. - */ -package org.taverna.server.master.notification; - -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.springframework.beans.factory.annotation.Required; -import org.taverna.server.master.interfaces.MessageDispatcher; -import org.taverna.server.master.interfaces.TavernaRun; - -/** - * A common object for handling dispatch of event-driven messages. - * - * @author Donal Fellows - */ -public class NotificationEngine { - private Log log = LogFactory.getLog("Taverna.Server.Notification"); - private Map<String, MessageDispatcher> dispatchers; - private List<MessageDispatcher> universalDispatchers; - - /** - * @param dispatchers - * The various dispatchers we want to install. - */ - @Required - public void setDispatchers(List<MessageDispatcher> dispatchers) { - this.dispatchers = new HashMap<>(); - for (MessageDispatcher d : dispatchers) - this.dispatchers.put(d.getName(), d); - } - - /** - * @param dispatcherList - * A list of dispatch objects to always dispatch to. - */ - @Required - public void setUniversalDispatchers(List<MessageDispatcher> dispatcherList) { - this.universalDispatchers = dispatcherList; - } - - private void dispatchToChosenTarget(TavernaRun originator, String scheme, - String target, Message message) throws Exception { - try { - MessageDispatcher d = dispatchers.get(scheme); - if (d != null && d.isAvailable()) - d.dispatch(originator, message.getTitle(scheme), - message.getContent(scheme), target); - else - log.warn("no such notification dispatcher for " + scheme); - } catch (URISyntaxException e) { - // See if *someone* will handle the message - Exception e2 = null; - for (MessageDispatcher d : dispatchers.values()) - try { - if (d.isAvailable()) { - d.dispatch(originator, message.getTitle(d.getName()), - message.getContent(d.getName()), scheme + ":" - + target); - return; - } - } catch (Exception ex) { - if (log.isDebugEnabled()) - log.debug("failed in pseudo-directed dispatch of " - + scheme + ":" + target, ex); - e2 = ex; - } - if (e2 != null) - throw e2; - } - } - - private void dispatchUniversally(TavernaRun originator, Message message) - throws Exception { - for (MessageDispatcher d : universalDispatchers) - try { - if (d.isAvailable()) - d.dispatch(originator, message.getTitle(d.getName()), - message.getContent(d.getName()), null); - } catch (Exception e) { - log.warn("problem in universal dispatcher", e); - } - } - - /** - * Dispatch a message over the notification fabric. - * - * @param originator - * What workflow run was the source of this message? - * @param destination - * Where the message should get delivered to. The correct format - * of this is either as a URI of some form (where the scheme - * determines the dispatcher) or as an invalid URI in which case - * it is just tried against the possibilities to see if any - * succeeds. - * @param subject - * The subject line of the message. - * @param message - * The plain text body of the message. - * @throws Exception - * If anything goes wrong with the dispatch process. - */ - public void dispatchMessage(TavernaRun originator, String destination, - Message message) throws Exception { - if (destination != null && !destination.trim().isEmpty()) { - try { - URI toURI = new URI(destination.trim()); - dispatchToChosenTarget(originator, toURI.getScheme(), - toURI.getSchemeSpecificPart(), message); - } catch (URISyntaxException e) { - // Ignore - } - } - dispatchUniversally(originator, message); - } - - /** - * @return The message dispatchers that are actually available (i.e., not - * disabled by configuration somewhere). - */ - public List<String> listAvailableDispatchers() { - ArrayList<String> result = new ArrayList<>(); - for (Map.Entry<String, MessageDispatcher> entry : dispatchers - .entrySet()) { - if (entry.getValue().isAvailable()) - result.add(entry.getKey()); - } - return result; - } - - public interface Message { - String getContent(String type); - - String getTitle(String type); - } -}
