Repository: incubator-taverna-server Updated Branches: refs/heads/master 529e59d51 -> 8d121b2b0
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/LocalWorker.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/LocalWorker.java b/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/LocalWorker.java new file mode 100644 index 0000000..adf3ea7 --- /dev/null +++ b/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/LocalWorker.java @@ -0,0 +1,769 @@ +/* + * Copyright (C) 2010-2012 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.localworker.impl; + +import static java.lang.Runtime.getRuntime; +import static java.lang.System.getProperty; +import static java.lang.System.out; +import static java.lang.management.ManagementFactory.getRuntimeMXBean; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static java.util.UUID.randomUUID; +import static org.apache.commons.io.FileUtils.forceDelete; +import static org.apache.commons.io.FileUtils.forceMkdir; +import static org.apache.commons.io.FileUtils.writeByteArrayToFile; +import static org.apache.commons.io.FileUtils.writeLines; +import static org.taverna.server.localworker.api.Constants.HELIO_TOKEN_NAME; +import static org.taverna.server.localworker.api.Constants.KEYSTORE_FILE; +import static org.taverna.server.localworker.api.Constants.KEYSTORE_PASSWORD; +import static org.taverna.server.localworker.api.Constants.SECURITY_DIR_NAME; +import static org.taverna.server.localworker.api.Constants.SHARED_DIR_PROP; +import static org.taverna.server.localworker.api.Constants.SUBDIR_LIST; +import static org.taverna.server.localworker.api.Constants.SYSTEM_ENCODING; +import static org.taverna.server.localworker.api.Constants.TRUSTSTORE_FILE; +import static org.taverna.server.localworker.impl.utils.FilenameVerifier.getValidatedFile; +import static org.taverna.server.localworker.remote.RemoteStatus.Finished; +import static org.taverna.server.localworker.remote.RemoteStatus.Initialized; +import static org.taverna.server.localworker.remote.RemoteStatus.Operating; +import static org.taverna.server.localworker.remote.RemoteStatus.Stopped; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URL; +import java.rmi.RemoteException; +import java.rmi.server.UnicastRemoteObject; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; + +import org.taverna.server.localworker.api.Worker; +import org.taverna.server.localworker.api.WorkerFactory; +import org.taverna.server.localworker.remote.IllegalStateTransitionException; +import org.taverna.server.localworker.remote.ImplementationException; +import org.taverna.server.localworker.remote.RemoteDirectory; +import org.taverna.server.localworker.remote.RemoteInput; +import org.taverna.server.localworker.remote.RemoteListener; +import org.taverna.server.localworker.remote.RemoteSecurityContext; +import org.taverna.server.localworker.remote.RemoteSingleRun; +import org.taverna.server.localworker.remote.RemoteStatus; +import org.taverna.server.localworker.remote.StillWorkingOnItException; +import org.taverna.server.localworker.server.UsageRecordReceiver; + +/** + * This class implements one side of the connection between the Taverna Server + * master server and this process. It delegates to a {@link Worker} instance the + * handling of actually running a workflow. + * + * @author Donal Fellows + * @see DirectoryDelegate + * @see FileDelegate + * @see WorkerCore + */ +@SuppressWarnings("serial") +public class LocalWorker extends UnicastRemoteObject implements RemoteSingleRun { + // ----------------------- CONSTANTS ----------------------- + + /** Handle to the directory containing the security info. */ + static final File SECURITY_DIR; + static final String SLASHTEMP; + static { + SLASHTEMP = getProperty("java.io.tmpdir"); + File home = new File(getProperty("user.home")); + // If we can't write to $HOME (i.e., we're in an odd deployment) use + // the official version of /tmp/$PID as a fallback. + if (!home.canWrite()) + home = new File(SLASHTEMP, getRuntimeMXBean().getName()); + SECURITY_DIR = new File(home, SECURITY_DIR_NAME); + } + + // ----------------------- VARIABLES ----------------------- + + /** + * Magic flag used to turn off problematic code when testing inside CI + * environment. + */ + static boolean DO_MKDIR = true; + + /** What to use to run a workflow engine. */ + private final String executeWorkflowCommand; + /** What workflow to run. */ + private final byte[] workflow; + /** The remote access object for the working directory. */ + private final DirectoryDelegate baseDir; + /** What inputs to pass as files. */ + final Map<String, String> inputFiles; + /** What inputs to pass as files (as file refs). */ + final Map<String, File> inputRealFiles; + /** What inputs to pass as direct values. */ + final Map<String, String> inputValues; + /** What delimiters to use. */ + final Map<String, String> inputDelimiters; + /** The interface to the workflow engine subprocess. */ + private final Worker core; + /** Our descriptor token (UUID). */ + private final String masterToken; + /** + * The root working directory for a workflow run, or <tt>null</tt> if it has + * been deleted. + */ + private File base; + /** + * When did this workflow start running, or <tt>null</tt> for + * "never/not yet". + */ + private Date start; + /** + * When did this workflow finish running, or <tt>null</tt> for + * "never/not yet". + */ + private Date finish; + /** The cached status of the workflow run. */ + RemoteStatus status; + /** + * The name of the input Baclava document, or <tt>null</tt> to not do it + * that way. + */ + String inputBaclava; + /** + * The name of the output Baclava document, or <tt>null</tt> to not do it + * that way. + */ + String outputBaclava; + /** + * The file containing the input Baclava document, or <tt>null</tt> to not + * do it that way. + */ + private File inputBaclavaFile; + /** + * The file containing the output Baclava document, or <tt>null</tt> to not + * do it that way. + */ + private File outputBaclavaFile; + /** + * Registered shutdown hook so that we clean up when this process is killed + * off, or <tt>null</tt> if that is no longer necessary. + */ + Thread shutdownHook; + /** Location for security information to be written to. */ + File securityDirectory; + /** + * Password to use to encrypt security information. + */ + char[] keystorePassword = KEYSTORE_PASSWORD; + /** Additional server-specified environment settings. */ + Map<String, String> environment = new HashMap<>(); + /** Additional server-specified java runtime settings. */ + List<String> runtimeSettings = new ArrayList<>(); + URL interactionFeedURL; + URL webdavURL; + URL publishURL;//FIXME + private boolean doProvenance = true; + + // ----------------------- METHODS ----------------------- + + /** + * @param executeWorkflowCommand + * The script used to execute workflows. + * @param workflow + * The workflow to execute. + * @param workerClass + * The class to instantiate as our local representative of the + * run. + * @param urReceiver + * The remote class to report the generated usage record(s) to. + * @param id + * The UUID to use, or <tt>null</tt> if we are to invent one. + * @param seedEnvironment + * The key/value pairs to seed the worker subprocess environment + * with. + * @param javaParams + * Parameters to pass to the worker subprocess java runtime + * itself. + * @param workerFactory + * How to make instances of the low-level worker objects. + * @throws RemoteException + * If registration of the worker fails. + * @throws ImplementationException + * If something goes wrong during local setup. + */ + protected LocalWorker(String executeWorkflowCommand, byte[] workflow, + UsageRecordReceiver urReceiver, UUID id, + Map<String, String> seedEnvironment, List<String> javaParams, + WorkerFactory workerFactory) throws RemoteException, + ImplementationException { + super(); + if (id == null) + id = randomUUID(); + masterToken = id.toString(); + this.workflow = workflow; + this.executeWorkflowCommand = executeWorkflowCommand; + String sharedDir = getProperty(SHARED_DIR_PROP, SLASHTEMP); + base = new File(sharedDir, masterToken); + out.println("about to create " + base); + try { + forceMkdir(base); + for (String subdir : SUBDIR_LIST) { + new File(base, subdir).mkdir(); + } + } catch (IOException e) { + throw new ImplementationException( + "problem creating run working directory", e); + } + baseDir = new DirectoryDelegate(base, null); + inputFiles = new HashMap<>(); + inputRealFiles = new HashMap<>(); + inputValues = new HashMap<>(); + inputDelimiters = new HashMap<>(); + environment.putAll(seedEnvironment); + runtimeSettings.addAll(javaParams); + try { + core = workerFactory.makeInstance(); + } catch (Exception e) { + out.println("problem when creating core worker implementation"); + e.printStackTrace(out); + throw new ImplementationException( + "problem when creating core worker implementation", e); + } + core.setURReceiver(urReceiver); + Thread t = new Thread(new Runnable() { + /** + * Kill off the worker launched by the core. + */ + @Override + public void run() { + try { + shutdownHook = null; + destroy(); + } catch (ImplementationException e) { + // Absolutely nothing we can do here + } + } + }); + getRuntime().addShutdownHook(t); + shutdownHook = t; + status = Initialized; + } + + @Override + public void destroy() throws ImplementationException { + killWorkflowSubprocess(); + removeFromShutdownHooks(); + // Is this it? + deleteWorkingDirectory(); + deleteSecurityManagerDirectory(); + core.deleteLocalResources(); + } + + private void killWorkflowSubprocess() { + if (status != Finished && status != Initialized) + try { + core.killWorker(); + if (finish == null) + finish = new Date(); + } catch (Exception e) { + out.println("problem when killing worker"); + e.printStackTrace(out); + } + } + + private void removeFromShutdownHooks() throws ImplementationException { + try { + if (shutdownHook != null) + getRuntime().removeShutdownHook(shutdownHook); + } catch (RuntimeException e) { + throw new ImplementationException("problem removing shutdownHook", + e); + } finally { + shutdownHook = null; + } + } + + private void deleteWorkingDirectory() throws ImplementationException { + try { + if (base != null) + forceDelete(base); + } catch (IOException e) { + out.println("problem deleting working directory"); + e.printStackTrace(out); + throw new ImplementationException( + "problem deleting working directory", e); + } finally { + base = null; + } + } + + private void deleteSecurityManagerDirectory() + throws ImplementationException { + try { + if (securityDirectory != null) + forceDelete(securityDirectory); + } catch (IOException e) { + out.println("problem deleting security directory"); + e.printStackTrace(out); + throw new ImplementationException( + "problem deleting security directory", e); + } finally { + securityDirectory = null; + } + } + + @Override + public void addListener(RemoteListener listener) throws RemoteException, + ImplementationException { + throw new ImplementationException("not implemented"); + } + + @Override + public String getInputBaclavaFile() { + return inputBaclava; + } + + @Override + public List<RemoteInput> getInputs() throws RemoteException { + ArrayList<RemoteInput> result = new ArrayList<>(); + for (String name : inputFiles.keySet()) + result.add(new InputDelegate(name)); + return result; + } + + @Override + public List<String> getListenerTypes() { + return emptyList(); + } + + @Override + public List<RemoteListener> getListeners() { + return singletonList(core.getDefaultListener()); + } + + @Override + public String getOutputBaclavaFile() { + return outputBaclava; + } + + class SecurityDelegate extends UnicastRemoteObject implements + RemoteSecurityContext { + private void setPrivatePerms(File dir) { + if (!dir.setReadable(false, false) || !dir.setReadable(true, true) + || !dir.setExecutable(false, false) + || !dir.setExecutable(true, true) + || !dir.setWritable(false, false) + || !dir.setWritable(true, true)) { + out.println("warning: " + + "failed to set permissions on security context directory"); + } + } + + protected SecurityDelegate(String token) throws IOException { + super(); + if (DO_MKDIR) { + securityDirectory = new File(SECURITY_DIR, token); + forceMkdir(securityDirectory); + setPrivatePerms(securityDirectory); + } + } + + /** + * Write some data to a given file in the context directory. + * + * @param name + * The name of the file to write. + * @param data + * The bytes to put in the file. + * @throws RemoteException + * If anything goes wrong. + * @throws ImplementationException + */ + protected void write(String name, byte[] data) throws RemoteException, + ImplementationException { + try { + File f = new File(securityDirectory, name); + writeByteArrayToFile(f, data); + } catch (IOException e) { + throw new ImplementationException("problem writing " + name, e); + } + } + + /** + * Write some data to a given file in the context directory. + * + * @param name + * The name of the file to write. + * @param data + * The lines to put in the file. The + * {@linkplain LocalWorker#SYSTEM_ENCODING system encoding} + * will be used to do the writing. + * @throws RemoteException + * If anything goes wrong. + * @throws ImplementationException + */ + protected void write(String name, Collection<String> data) + throws RemoteException, ImplementationException { + try { + File f = new File(securityDirectory, name); + writeLines(f, SYSTEM_ENCODING, data); + } catch (IOException e) { + throw new ImplementationException("problem writing " + name, e); + } + } + + /** + * Write some data to a given file in the context directory. + * + * @param name + * The name of the file to write. + * @param data + * The line to put in the file. The + * {@linkplain LocalWorker#SYSTEM_ENCODING system encoding} + * will be used to do the writing. + * @throws RemoteException + * If anything goes wrong. + * @throws ImplementationException + */ + protected void write(String name, char[] data) throws RemoteException, + ImplementationException { + try { + File f = new File(securityDirectory, name); + writeLines(f, SYSTEM_ENCODING, asList(new String(data))); + } catch (IOException e) { + throw new ImplementationException("problem writing " + name, e); + } + } + + @Override + public void setKeystore(byte[] keystore) throws RemoteException, + ImplementationException { + if (status != Initialized) + throw new RemoteException("not initializing"); + if (keystore == null) + throw new IllegalArgumentException("keystore may not be null"); + write(KEYSTORE_FILE, keystore); + } + + @Override + public void setPassword(char[] password) throws RemoteException { + if (status != Initialized) + throw new RemoteException("not initializing"); + if (password == null) + throw new IllegalArgumentException("password may not be null"); + keystorePassword = password.clone(); + } + + @Override + public void setTruststore(byte[] truststore) throws RemoteException, + ImplementationException { + if (status != Initialized) + throw new RemoteException("not initializing"); + if (truststore == null) + throw new IllegalArgumentException("truststore may not be null"); + write(TRUSTSTORE_FILE, truststore); + } + + @Override + public void setUriToAliasMap(Map<URI, String> uriToAliasMap) + throws RemoteException { + if (status != Initialized) + throw new RemoteException("not initializing"); + if (uriToAliasMap == null) + return; + ArrayList<String> lines = new ArrayList<>(); + for (Entry<URI, String> site : uriToAliasMap.entrySet()) + lines.add(site.getKey().toASCIIString() + " " + site.getValue()); + // write(URI_ALIAS_MAP, lines); + } + + @Override + public void setHelioToken(String helioToken) throws RemoteException { + if (status != Initialized) + throw new RemoteException("not initializing"); + out.println("registering HELIO CIS token for export"); + environment.put(HELIO_TOKEN_NAME, helioToken); + } + } + + @Override + public RemoteSecurityContext getSecurityContext() throws RemoteException, + ImplementationException { + try { + return new SecurityDelegate(masterToken); + } catch (RemoteException e) { + if (e.getCause() != null) + throw new ImplementationException( + "problem initializing security context", e.getCause()); + throw e; + } catch (IOException e) { + throw new ImplementationException( + "problem initializing security context", e); + } + } + + @Override + public RemoteStatus getStatus() { + // only state that can spontaneously change to another + if (status == Operating) { + status = core.getWorkerStatus(); + if (status == Finished && finish == null) + finish = new Date(); + } + return status; + } + + @Override + public RemoteDirectory getWorkingDirectory() { + return baseDir; + } + + File validateFilename(String filename) throws RemoteException { + if (filename == null) + throw new IllegalArgumentException("filename must be non-null"); + try { + return getValidatedFile(base, filename.split("/")); + } catch (IOException e) { + throw new IllegalArgumentException("failed to validate filename", e); + } + } + + class InputDelegate extends UnicastRemoteObject implements RemoteInput { + private String name; + + InputDelegate(String name) throws RemoteException { + super(); + this.name = name; + if (!inputFiles.containsKey(name)) { + if (status != Initialized) + throw new IllegalStateException("not initializing"); + inputFiles.put(name, null); + inputRealFiles.put(name, null); + inputValues.put(name, null); + inputDelimiters.put(name, null); + } + } + + @Override + public String getFile() { + return inputFiles.get(name); + } + + @Override + public String getName() { + return name; + } + + @Override + public String getValue() { + return inputValues.get(name); + } + + @Override + public String getDelimiter() throws RemoteException { + return inputDelimiters.get(name); + } + + @Override + public void setFile(String file) throws RemoteException { + if (status != Initialized) + throw new IllegalStateException("not initializing"); + inputRealFiles.put(name, validateFilename(file)); + inputValues.put(name, null); + inputFiles.put(name, file); + inputBaclava = null; + } + + @Override + public void setValue(String value) throws RemoteException { + if (status != Initialized) + throw new IllegalStateException("not initializing"); + inputValues.put(name, value); + inputFiles.put(name, null); + inputRealFiles.put(name, null); + inputBaclava = null; + } + + @Override + public void setDelimiter(String delimiter) throws RemoteException { + if (status != Initialized) + throw new IllegalStateException("not initializing"); + if (inputBaclava != null) + throw new IllegalStateException("input baclava file set"); + if (delimiter != null) { + if (delimiter.length() > 1) + throw new IllegalStateException( + "multi-character delimiter not permitted"); + if (delimiter.charAt(0) == 0) + throw new IllegalStateException( + "may not use NUL for splitting"); + if (delimiter.charAt(0) > 127) + throw new IllegalStateException( + "only ASCII characters supported for splitting"); + } + inputDelimiters.put(name, delimiter); + } + } + + @Override + public RemoteInput makeInput(String name) throws RemoteException { + return new InputDelegate(name); + } + + @Override + public RemoteListener makeListener(String type, String configuration) + throws RemoteException { + throw new RemoteException("listener manufacturing unsupported"); + } + + @Override + public void setInputBaclavaFile(String filename) throws RemoteException { + if (status != Initialized) + throw new IllegalStateException("not initializing"); + inputBaclavaFile = validateFilename(filename); + for (String input : inputFiles.keySet()) { + inputFiles.put(input, null); + inputRealFiles.put(input, null); + inputValues.put(input, null); + } + inputBaclava = filename; + } + + @Override + public void setOutputBaclavaFile(String filename) throws RemoteException { + if (status != Initialized) + throw new IllegalStateException("not initializing"); + if (filename != null) + outputBaclavaFile = validateFilename(filename); + else + outputBaclavaFile = null; + outputBaclava = filename; + } + + @Override + public void setGenerateProvenance(boolean prov) { + doProvenance = prov; + } + + @Override + public void setStatus(RemoteStatus newStatus) + throws IllegalStateTransitionException, RemoteException, + ImplementationException, StillWorkingOnItException { + if (status == newStatus) + return; + + switch (newStatus) { + case Initialized: + throw new IllegalStateTransitionException( + "may not move back to start"); + case Operating: + switch (status) { + case Initialized: + boolean started; + try { + started = createWorker(); + } catch (Exception e) { + throw new ImplementationException( + "problem creating executing workflow", e); + } + if (!started) + throw new StillWorkingOnItException( + "workflow start in process"); + break; + case Stopped: + try { + core.startWorker(); + } catch (Exception e) { + throw new ImplementationException( + "problem continuing workflow run", e); + } + break; + case Finished: + throw new IllegalStateTransitionException("already finished"); + default: + break; + } + status = Operating; + break; + case Stopped: + switch (status) { + case Initialized: + throw new IllegalStateTransitionException( + "may only stop from Operating"); + case Operating: + try { + core.stopWorker(); + } catch (Exception e) { + throw new ImplementationException( + "problem stopping workflow run", e); + } + break; + case Finished: + throw new IllegalStateTransitionException("already finished"); + default: + break; + } + status = Stopped; + break; + case Finished: + switch (status) { + case Operating: + case Stopped: + try { + core.killWorker(); + if (finish == null) + finish = new Date(); + } catch (Exception e) { + throw new ImplementationException( + "problem killing workflow run", e); + } + default: + break; + } + status = Finished; + break; + } + } + + private boolean createWorker() throws Exception { + start = new Date(); + char[] pw = keystorePassword; + keystorePassword = null; + /* + * Do not clear the keystorePassword array here; its ownership is + * *transferred* to the worker core which doesn't copy it but *does* + * clear it after use. + */ + return core.initWorker(this, executeWorkflowCommand, workflow, base, + inputBaclavaFile, inputRealFiles, inputValues, inputDelimiters, + outputBaclavaFile, securityDirectory, pw, doProvenance, + environment, masterToken, runtimeSettings); + } + + @Override + public Date getFinishTimestamp() { + return finish == null ? null : new Date(finish.getTime()); + } + + @Override + public Date getStartTimestamp() { + return start == null ? null : new Date(start.getTime()); + } + + @Override + public void setInteractionServiceDetails(URL feed, URL webdav, URL publish) { + interactionFeedURL = feed; + webdavURL = webdav; + publishURL = publish; + } + + @Override + public void ping() { + // Do nothing here; this *should* be empty + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/TavernaRunManager.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/TavernaRunManager.java b/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/TavernaRunManager.java new file mode 100644 index 0000000..03ee69d --- /dev/null +++ b/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/TavernaRunManager.java @@ -0,0 +1,243 @@ +/* + * Copyright (C) 2010-2011 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.localworker.impl; + +import static java.lang.Runtime.getRuntime; +import static java.lang.System.exit; +import static java.lang.System.getProperty; +import static java.lang.System.out; +import static java.lang.System.setProperty; +import static java.lang.System.setSecurityManager; +import static java.rmi.registry.LocateRegistry.getRegistry; +import static org.taverna.server.localworker.api.Constants.DEATH_DELAY; +import static org.taverna.server.localworker.api.Constants.LOCALHOST; +import static org.taverna.server.localworker.api.Constants.RMI_HOST_PROP; +import static org.taverna.server.localworker.api.Constants.SECURITY_POLICY_FILE; +import static org.taverna.server.localworker.api.Constants.SEC_POLICY_PROP; +import static org.taverna.server.localworker.api.Constants.UNSECURE_PROP; + +import java.io.ByteArrayInputStream; +import java.net.URI; +import java.rmi.RMISecurityManager; +import java.rmi.RemoteException; +import java.rmi.registry.Registry; +import java.rmi.server.UnicastRemoteObject; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.taverna.server.localworker.api.RunAccounting; +import org.taverna.server.localworker.api.Worker; +import org.taverna.server.localworker.api.WorkerFactory; +import org.taverna.server.localworker.remote.RemoteRunFactory; +import org.taverna.server.localworker.remote.RemoteSingleRun; +import org.taverna.server.localworker.server.UsageRecordReceiver; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +import uk.org.taverna.scufl2.api.io.WorkflowBundleIO; + +/** + * The registered factory for runs, this class is responsible for constructing + * runs that are suitable for particular users. It is also the entry point for + * this whole process. + * + * @author Donal Fellows + * @see LocalWorker + */ +@SuppressWarnings("serial") +public class TavernaRunManager extends UnicastRemoteObject implements + RemoteRunFactory, RunAccounting, WorkerFactory { + String command; + Map<String, String> seedEnvironment = new HashMap<>(); + List<String> javaInitParams = new ArrayList<>(); + private WorkflowBundleIO io; + private int activeRuns = 0; + // Hacks! + public static String interactionHost; + public static String interactionPort; + public static String interactionWebdavPath; + public static String interactionFeedPath; + + /** + * How to get the actual workflow document from the XML document that it is + * contained in. + * + * @param containerDocument + * The document sent from the web interface. + * @return The element describing the workflow, as expected by the Taverna + * command line executor. + */ + protected Element unwrapWorkflow(Document containerDocument) { + return (Element) containerDocument.getDocumentElement().getFirstChild(); + } + + private static final String usage = "java -jar server.worker.jar workflowExecScript ?-Ekey=val...? ?-Jconfig? UUID"; + + /** + * An RMI-enabled factory for runs. + * + * @param command + * What command to call to actually run a run. + * @throws RemoteException + * If anything goes wrong during creation of the instance. + */ + public TavernaRunManager(String command) throws RemoteException { + this.command = command; + this.io = new WorkflowBundleIO(); + } + + @Override + public RemoteSingleRun make(byte[] workflow, String creator, + UsageRecordReceiver urReceiver, UUID id) throws RemoteException { + if (creator == null) + throw new RemoteException("no creator"); + try { + URI wfid = io.readBundle(new ByteArrayInputStream(workflow), null) + .getMainWorkflow().getWorkflowIdentifier(); + out.println("Creating run from workflow <" + wfid + "> for <" + + creator + ">"); + return new LocalWorker(command, workflow, urReceiver, id, + seedEnvironment, javaInitParams, this); + } catch (RemoteException e) { + throw e; + } catch (Exception e) { + throw new RemoteException("bad instance construction", e); + } + } + + private static boolean shuttingDown; + private static String factoryName; + private static Registry registry; + + static synchronized void unregisterFactory() { + if (!shuttingDown) { + shuttingDown = true; + try { + if (factoryName != null && registry != null) + registry.unbind(factoryName); + } catch (Exception e) { + e.printStackTrace(out); + } + } + } + + @Override + public void shutdown() { + unregisterFactory(); + new Thread(new DelayedDeath()).start(); + } + + static class DelayedDeath implements Runnable { + @Override + public void run() { + try { + Thread.sleep(DEATH_DELAY); + } catch (InterruptedException e) { + } finally { + exit(0); + } + } + } + + private void addArgument(String arg) { + if (arg.startsWith("-E")) { + String trimmed = arg.substring(2); + int idx = trimmed.indexOf('='); + if (idx > 0) { + addEnvironmentDefinition(trimmed.substring(0, idx), + trimmed.substring(idx + 1)); + return; + } + } else if (arg.startsWith("-D")) { + if (arg.indexOf('=') > 0) { + addJavaParameter(arg); + return; + } + } else if (arg.startsWith("-J")) { + addJavaParameter(arg.substring(2)); + return; + } + throw new IllegalArgumentException("argument \"" + arg + + "\" must start with -D, -E or -J; " + + "-D and -E must contain a \"=\""); + } + + /** + * @param args + * The arguments from the command line invocation. + * @throws Exception + * If we can't connect to the RMI registry, or if we can't read + * the workflow, or if we can't build the worker instance, or + * register it. Also if the arguments are wrong. + */ + public static void main(String[] args) throws Exception { + if (args.length < 2) + throw new Exception("wrong # args: must be \"" + usage + "\""); + if (!getProperty(UNSECURE_PROP, "no").equals("yes")) { + setProperty(SEC_POLICY_PROP, LocalWorker.class.getClassLoader() + .getResource(SECURITY_POLICY_FILE).toExternalForm()); + setProperty(RMI_HOST_PROP, LOCALHOST); + } + setSecurityManager(new RMISecurityManager()); + factoryName = args[args.length - 1]; + TavernaRunManager man = new TavernaRunManager(args[0]); + for (int i = 1; i < args.length - 1; i++) + man.addArgument(args[i]); + registry = getRegistry(LOCALHOST); + + registry.bind(factoryName, man); + getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + unregisterFactory(); + } + }); + out.println("registered RemoteRunFactory with ID " + factoryName); + } + + private void addJavaParameter(String string) { + this.javaInitParams.add(string); + } + + private void addEnvironmentDefinition(String key, String value) { + this.seedEnvironment.put(key, value); + } + + @Override + public void setInteractionServiceDetails(String host, String port, + String webdavPath, String feedPath) throws RemoteException { + if (host == null || port == null || webdavPath == null + || feedPath == null) + throw new IllegalArgumentException("all params must be non-null"); + interactionHost = host; + interactionPort = port; + interactionWebdavPath = webdavPath; + interactionFeedPath = feedPath; + } + + @Override + public synchronized int countOperatingRuns() { + return (activeRuns < 0 ? 0 : activeRuns); + } + + @Override + public synchronized void runStarted() { + activeRuns++; + } + + @Override + public synchronized void runCeased() { + activeRuns--; + } + + @Override + public Worker makeInstance() throws Exception { + return new WorkerCore(this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/WorkerCore.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/WorkerCore.java b/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/WorkerCore.java new file mode 100644 index 0000000..1a6cff8 --- /dev/null +++ b/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/WorkerCore.java @@ -0,0 +1,918 @@ +/* + * Copyright (C) 2010-2012 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.localworker.impl; + +import static java.io.File.createTempFile; +import static java.io.File.pathSeparator; +import static java.lang.Boolean.parseBoolean; +import static java.lang.Double.parseDouble; +import static java.lang.Integer.parseInt; +import static java.lang.Long.parseLong; +import static java.lang.Runtime.getRuntime; +import static java.lang.System.out; +import static java.net.InetAddress.getLocalHost; +import static org.apache.commons.io.FileUtils.forceDelete; +import static org.apache.commons.io.FileUtils.sizeOfDirectory; +import static org.apache.commons.io.FileUtils.write; +import static org.apache.commons.io.IOUtils.copy; +import static org.taverna.server.localworker.api.Constants.CREDENTIAL_MANAGER_DIRECTORY; +import static org.taverna.server.localworker.api.Constants.CREDENTIAL_MANAGER_PASSWORD; +import static org.taverna.server.localworker.api.Constants.DEATH_TIME; +import static org.taverna.server.localworker.api.Constants.DEFAULT_LISTENER_NAME; +import static org.taverna.server.localworker.api.Constants.KEYSTORE_PASSWORD; +import static org.taverna.server.localworker.api.Constants.START_WAIT_TIME; +import static org.taverna.server.localworker.api.Constants.SYSTEM_ENCODING; +import static org.taverna.server.localworker.api.Constants.TIME; +import static org.taverna.server.localworker.impl.Status.Aborted; +import static org.taverna.server.localworker.impl.Status.Completed; +import static org.taverna.server.localworker.impl.Status.Failed; +import static org.taverna.server.localworker.impl.Status.Held; +import static org.taverna.server.localworker.impl.Status.Started; +import static org.taverna.server.localworker.impl.TavernaRunManager.interactionFeedPath; +import static org.taverna.server.localworker.impl.TavernaRunManager.interactionHost; +import static org.taverna.server.localworker.impl.TavernaRunManager.interactionPort; +import static org.taverna.server.localworker.impl.TavernaRunManager.interactionWebdavPath; +import static org.taverna.server.localworker.impl.WorkerCore.pmap; +import static org.taverna.server.localworker.remote.RemoteStatus.Finished; +import static org.taverna.server.localworker.remote.RemoteStatus.Initialized; +import static org.taverna.server.localworker.remote.RemoteStatus.Operating; +import static org.taverna.server.localworker.remote.RemoteStatus.Stopped; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.io.UnsupportedEncodingException; +import java.io.Writer; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.rmi.RemoteException; +import java.rmi.server.UnicastRemoteObject; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.xml.datatype.DatatypeConfigurationException; +import javax.xml.ws.Holder; + +import org.ogf.usage.JobUsageRecord; +import org.taverna.server.localworker.api.RunAccounting; +import org.taverna.server.localworker.api.Worker; +import org.taverna.server.localworker.impl.utils.TimingOutTask; +import org.taverna.server.localworker.remote.ImplementationException; +import org.taverna.server.localworker.remote.RemoteListener; +import org.taverna.server.localworker.remote.RemoteStatus; +import org.taverna.server.localworker.server.UsageRecordReceiver; + +/** + * The core class that connects to a Taverna command-line workflow execution + * engine. This implementation always registers a single listener, &lquo; + * <tt>io</tt> &rquo;, with two properties representing the stdout and stderr of + * the run and one representing the exit code. The listener is + * remote-accessible. It does not support attaching any other listeners. + * + * @author Donal Fellows + */ +@SuppressWarnings("serial") +public class WorkerCore extends UnicastRemoteObject implements Worker, + RemoteListener { + @Nonnull + static final Map<String, Property> pmap = new HashMap<>(); + /** + * Regular expression to extract the detailed timing information from the + * output of /usr/bin/time + */ + @Nonnull + private static final Pattern TimeRE; + static { + final String TIMERE = "([0-9.:]+)"; + final String TERMS = "(real|user|system|sys|elapsed)"; + TimeRE = Pattern.compile(TIMERE + " *" + TERMS + "[ \t]*" + TIMERE + + " *" + TERMS + "[ \t]*" + TIMERE + " *" + TERMS); + } + + /** + * Environment variables to remove before any fork (because they're large or + * potentially leaky). + */ + // TODO Conduct a proper survey of what to remove + @Nonnull + private static final String[] ENVIRONMENT_TO_REMOVE = { "SUDO_COMMAND", + "SUDO_USER", "SUDO_GID", "SUDO_UID", "DISPLAY", "LS_COLORS", + "XFILESEARCHPATH", "SSH_AGENT_PID", "SSH_AUTH_SOCK" }; + + @Nullable + Process subprocess; + @Nonnull + final StringWriter stdout; + @Nonnull + final StringWriter stderr; + @Nullable + Integer exitCode; + boolean readyToSendEmail; + @Nullable + String emailAddress; + @Nullable + Date start; + @Nonnull + final RunAccounting accounting; + @Nonnull + final Holder<Integer> pid; + + private boolean finished; + @Nullable + private JobUsageRecord ur; + @Nullable + private File wd; + @Nullable + private UsageRecordReceiver urreceiver; + @Nullable + private File workflowFile; + private boolean stopped; + + /** + * @param accounting + * Object that looks after how many runs are executing. + * @throws RemoteException + */ + public WorkerCore(@Nonnull RunAccounting accounting) throws RemoteException { + super(); + stdout = new StringWriter(); + stderr = new StringWriter(); + pid = new Holder<>(); + this.accounting = accounting; + } + + private int getPID() { + synchronized (pid) { + if (pid.value == null) + return -1; + return pid.value; + } + } + + /** + * Fire up the workflow. This causes a transition into the operating state. + * + * @param executeWorkflowCommand + * The command to run to execute the workflow. + * @param workflow + * The workflow document to execute. + * @param workingDir + * What directory to use as the working directory. + * @param inputBaclava + * The baclava file to use for inputs, or <tt>null</tt> to use + * the other <b>input*</b> arguments' values. + * @param inputFiles + * A mapping of input names to files that supply them. Note that + * we assume that nothing mapped here will be mapped in + * <b>inputValues</b>. + * @param inputValues + * A mapping of input names to values to supply to them. Note + * that we assume that nothing mapped here will be mapped in + * <b>inputFiles</b>. + * @param outputBaclava + * What baclava file to write the output from the workflow into, + * or <tt>null</tt> to have it written into the <tt>out</tt> + * subdirectory. + * @param token + * The name of the workflow run. + * @return <tt>true</tt> if the worker started, or <tt>false</tt> if a + * timeout occurred. + * @throws IOException + * If any of quite a large number of things goes wrong. + */ + @Override + public boolean initWorker( + @Nonnull final LocalWorker local, + @Nonnull final String executeWorkflowCommand, + @Nonnull final byte[] workflow, + @Nonnull final File workingDir, + @Nullable final File inputBaclava, + @Nonnull final Map<String, File> inputFiles, + @Nonnull final Map<String, String> inputValues, + @Nonnull final Map<String, String> inputDelimiters, + @Nullable final File outputBaclava, + @Nonnull final File securityDir, + @Nullable final char[] password, + final boolean generateProvenance, + @Nonnull final Map<String, String> environment, + @Nullable final String token, + @Nonnull final List<String> runtime) throws IOException { + try { + new TimingOutTask() { + @Override + public void doIt() throws IOException { + startExecutorSubprocess( + createProcessBuilder(local, executeWorkflowCommand, + workflow, workingDir, inputBaclava, + inputFiles, inputValues, inputDelimiters, + outputBaclava, securityDir, password, + generateProvenance, environment, token, + runtime), password); + } + }.doOrTimeOut(START_WAIT_TIME); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); + } + return subprocess != null; + } + + private void startExecutorSubprocess(@Nonnull ProcessBuilder pb, + @Nullable char[] password) throws IOException { + // Start the subprocess + out.println("starting " + pb.command() + " in directory " + + pb.directory() + " with environment " + pb.environment()); + subprocess = pb.start(); + if (subprocess == null) + throw new IOException("unknown failure creating process"); + start = new Date(); + accounting.runStarted(); + + // Capture its stdout and stderr + new AsyncCopy(subprocess.getInputStream(), stdout, pid); + new AsyncCopy(subprocess.getErrorStream(), stderr); + if (password != null) + new PasswordWriterThread(subprocess, password); + } + + /** + * Assemble the process builder. Does not launch the subprocess. + * + * @param local + * The local worker container. + * @param executeWorkflowCommand + * The reference to the workflow engine implementation. + * @param workflow + * The workflow to execute. + * @param workingDir + * The working directory to use. + * @param inputBaclava + * What file to read a baclava document from (or <tt>null</tt>) + * @param inputFiles + * The mapping from inputs to files. + * @param inputValues + * The mapping from inputs to literal values. + * @param outputBaclava + * What file to write a baclava document to (or <tt>null</tt>) + * @param securityDir + * The credential manager directory. + * @param password + * The password for the credential manager. + * @param environment + * The seed environment + * @param token + * The run identifier that the server wants to use. + * @param runtime + * Any runtime parameters to Java. + * @return The configured process builder. + * @throws IOException + * If file handling fails + * @throws UnsupportedEncodingException + * If we can't encode any text (unlikely) + * @throws FileNotFoundException + * If we can't write the workflow out (unlikely) + */ + @Nonnull + ProcessBuilder createProcessBuilder(@Nonnull LocalWorker local, + @Nonnull String executeWorkflowCommand, @Nonnull byte[] workflow, + @Nonnull File workingDir, @Nullable File inputBaclava, + @Nonnull Map<String, File> inputFiles, + @Nonnull Map<String, String> inputValues, + @Nonnull Map<String, String> inputDelimiters, + @Nullable File outputBaclava, @Nonnull File securityDir, + @Nonnull char[] password, boolean generateProvenance, + @Nonnull Map<String, String> environment, @Nonnull String token, + @Nonnull List<String> runtime) throws IOException, + UnsupportedEncodingException, FileNotFoundException { + ProcessBuilder pb = new ProcessBuilder(); + pb.command().add(TIME); + /* + * WARNING! HERE THERE BE DRAGONS! BE CAREFUL HERE! + * + * Work around _Maven_ bug with permissions in zip files! The executable + * bit is stripped by Maven's handling of file permissions, and there's + * no practical way to work around it without massively increasing the + * pain in other ways. Only want this on Unix - Windows isn't affected + * by this - so we use the file separator as a proxy for whether this is + * a true POSIX system. Ugly! Ugly ugly ugly... + * + * http://jira.codehaus.org/browse/MASSEMBLY-337 is relevant, but not + * the whole story as we don't want to use a non-standard packaging + * method as there's a real chance of it going wrong in an unexpected + * way then. Other parts of the story are that the executable bit isn't + * preserved when unpacking with the dependency plugin, and there's no + * way to be sure that the servlet container will preserve the bit + * either (as that's probably using a Java-based ZIP engine). + */ + if (File.separatorChar == '/') + pb.command().add("/bin/sh"); + pb.command().add(executeWorkflowCommand); + if (runtime != null) + pb.command().addAll(runtime); + + // Enable verbose logging + pb.command().add("-logfile"); + pb.command().add( + new File(new File(workingDir, "logs"), "detail.log") + .getAbsolutePath()); + + if (securityDir != null) { + pb.command().add(CREDENTIAL_MANAGER_DIRECTORY); + pb.command().add(securityDir.getAbsolutePath()); + out.println("security dir location: " + securityDir); + } + if (password != null) { + pb.command().add(CREDENTIAL_MANAGER_PASSWORD); + out.println("password of length " + password.length + + " will be written to subprocess stdin"); + } + + // Add arguments denoting inputs + if (inputBaclava != null) { + pb.command().add("-inputdoc"); + pb.command().add(inputBaclava.getAbsolutePath()); + if (!inputBaclava.exists()) + throw new IOException("input baclava file doesn't exist"); + } else { + for (Entry<String, File> port : inputFiles.entrySet()) { + if (port.getValue() == null) + continue; + pb.command().add("-inputfile"); + pb.command().add(port.getKey()); + pb.command().add(port.getValue().getAbsolutePath()); + if (!port.getValue().exists()) + throw new IOException("input file for port \"" + port + + "\" doesn't exist"); + } + for (Entry<String, String> port : inputValues.entrySet()) { + if (port.getValue() == null) + continue; + pb.command().add("-inputfile"); + pb.command().add(port.getKey()); + File f = createTempFile(".tav_in_", null, workingDir); + pb.command().add(f.getAbsolutePath()); + write(f, port.getValue(), "UTF-8"); + } + for (Entry<String, String> delim : inputDelimiters.entrySet()) { + if (delim.getValue() == null) + continue; + pb.command().add("-inputdelimiter"); + pb.command().add(delim.getKey()); + pb.command().add(delim.getValue()); + } + } + + // Add arguments denoting outputs + if (outputBaclava != null) { + pb.command().add("-outputdoc"); + pb.command().add(outputBaclava.getAbsolutePath()); + if (!outputBaclava.getParentFile().exists()) + throw new IOException( + "parent directory of output baclava file does not exist"); + if (outputBaclava.exists()) + throw new IOException("output baclava file exists"); + // Provenance cannot be supported when using baclava output + } else { + File out = new File(workingDir, "out"); + if (!out.mkdir()) + throw new IOException("failed to make output directory \"out\""); + // Taverna needs the dir to *not* exist now + forceDelete(out); + pb.command().add("-outputdir"); + pb.command().add(out.getAbsolutePath()); + // Enable provenance generation + if (generateProvenance) { + pb.command().add("-embedded"); + pb.command().add("-provenance"); + pb.command().add("-provbundle"); + pb.command().add("out.bundle.zip"); + } + } + + // Add an argument holding the workflow + File tmp = createTempFile(".wf_", ".scufl2", workingDir); + try (OutputStream os = new FileOutputStream(tmp)) { + os.write(workflow); + } + pb.command().add(workflowFile.getAbsolutePath()); + + // Indicate what working directory to use + pb.directory(workingDir); + wd = workingDir; + + Map<String, String> env = pb.environment(); + for (String name : ENVIRONMENT_TO_REMOVE) + env.remove(name); + + // Merge any options we have had imposed on us from outside + env.putAll(environment); + + // Patch the environment to deal with TAVUTILS-17 + assert env.get("PATH") != null; + env.put("PATH", new File(System.getProperty("java.home"), "bin") + + pathSeparator + env.get("PATH")); + // Patch the environment to deal with TAVSERV-189 + env.put("TAVERNA_APPHOME", workingDir.getCanonicalPath()); + // Patch the environment to deal with TAVSERV-224 + env.put("TAVERNA_RUN_ID", token); + if (interactionHost != null || local.interactionFeedURL != null + || local.webdavURL != null) { + env.put("INTERACTION_HOST", makeInterHost(local.interactionFeedURL)); + env.put("INTERACTION_PORT", makeInterPort(local.interactionFeedURL)); + env.put("INTERACTION_FEED", makeInterPath(local.interactionFeedURL)); + env.put("INTERACTION_WEBDAV", + local.webdavURL != null ? local.webdavURL.getPath() + : interactionWebdavPath); + String pub = makeInterPublish(local.publishURL); + if (pub != null && !pub.isEmpty()) + env.put("INTERACTION_PUBLISH", pub); + } + return pb; + } + + @Nullable + private static String makeInterHost(@Nullable URL url) { + if (url == null) + return interactionHost; + return url.getProtocol() + "://" + url.getHost(); + } + + @Nullable + private static String makeInterPort(@Nullable URL url) { + if (url == null) + return interactionPort; + int port = url.getPort(); + if (port == -1) + port = url.getDefaultPort(); + return Integer.toString(port); + } + + @Nullable + private static String makeInterPublish(@Nullable URL url) + throws IOException { + if (url == null) + return null; + try { + URI uri = url.toURI(); + int port = uri.getPort(); + if (port == -1) + return uri.getScheme() + "://" + uri.getHost(); + else + return uri.getScheme() + "://" + uri.getHost() + ":" + port; + } catch (URISyntaxException e) { + throw new IOException("problem constructing publication url", e); + } + } + + @Nullable + private static String makeInterPath(@Nullable URL url) { + if (url == null) + return interactionFeedPath; + return url.getPath(); + } + + /** + * Kills off the subprocess if it exists and is alive. + */ + @Override + public void killWorker() { + if (!finished && subprocess != null) { + final Holder<Integer> code = new Holder<>(); + for (TimingOutTask tot : new TimingOutTask[] { new TimingOutTask() { + /** Check if the workflow terminated of its own accord */ + @Override + public void doIt() throws IOException { + code.value = subprocess.exitValue(); + accounting.runCeased(); + buildUR(code.value == 0 ? Completed : Failed, code.value); + } + }, new TimingOutTask() { + /** Tell the workflow to stop */ + @Override + public void doIt() throws IOException { + code.value = killNicely(); + accounting.runCeased(); + buildUR(code.value == 0 ? Completed : Aborted, code.value); + } + }, new TimingOutTask() { + /** Kill the workflow, kill it with fire */ + @Override + public void doIt() throws IOException { + code.value = killHard(); + accounting.runCeased(); + buildUR(code.value == 0 ? Completed : Aborted, code.value); + } + } }) { + try { + tot.doOrTimeOut(DEATH_TIME); + } catch (Exception e) { + } + if (code.value != null) + break; + } + finished = true; + setExitCode(code.value); + readyToSendEmail = true; + } + } + + /** + * Integrated spot to handle writing/logging of the exit code. + * + * @param code + * The exit code. + */ + private void setExitCode(int code) { + exitCode = code; + if (code > 256 - 8) { + out.println("workflow aborted, Raven issue = " + (code - 256)); + } else if (code > 128) { + out.println("workflow aborted, signal=" + (code - 128)); + } else { + out.println("workflow exited, code=" + code); + } + } + + @Nonnull + private JobUsageRecord newUR() throws DatatypeConfigurationException { + try { + if (wd != null) + return new JobUsageRecord(wd.getName()); + } catch (RuntimeException e) { + } + return new JobUsageRecord("unknown"); + } + + /** + * Fills in the accounting information from the exit code and stderr. + * + * @param exitCode + * The exit code from the program. + */ + private void buildUR(@Nonnull Status status, int exitCode) { + try { + Date now = new Date(); + long user = -1, sys = -1, real = -1; + Matcher m = TimeRE.matcher(stderr.toString()); + ur = newUR(); + while (m.find()) + for (int i = 1; i < 6; i += 2) + if (m.group(i + 1).equals("user")) + user = parseDuration(m.group(i)); + else if (m.group(i + 1).equals("sys") + || m.group(i + 1).equals("system")) + sys = parseDuration(m.group(i)); + else if (m.group(i + 1).equals("real") + || m.group(i + 1).equals("elapsed")) + real = parseDuration(m.group(i)); + if (user != -1) + ur.addCpuDuration(user).setUsageType("user"); + if (sys != -1) + ur.addCpuDuration(sys).setUsageType("system"); + ur.addUser(System.getProperty("user.name"), null); + ur.addStartAndEnd(start, now); + if (real != -1) + ur.addWallDuration(real); + else + ur.addWallDuration(now.getTime() - start.getTime()); + ur.setStatus(status.toString()); + ur.addHost(getLocalHost().getHostName()); + ur.addResource("exitcode", Integer.toString(exitCode)); + ur.addDisk(sizeOfDirectory(wd)).setStorageUnit("B"); + if (urreceiver != null) + urreceiver.acceptUsageRecord(ur.marshal()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private long parseDuration(@Nonnull String durationString) { + try { + return (long) (parseDouble(durationString) * 1000); + } catch (NumberFormatException nfe) { + // Not a double; maybe MM:SS.mm or HH:MM:SS.mm + } + long dur = 0; + for (String d : durationString.split(":")) + try { + dur = 60 * dur + parseLong(d); + } catch (NumberFormatException nfe) { + // Assume that only one thing is fractional, and that it is last + return 60000 * dur + (long) (parseDouble(d) * 1000); + } + return dur * 1000; + } + + private void signal(@Nonnull String signal) throws Exception { + int pid = getPID(); + if (pid > 0 + && getRuntime().exec("kill -" + signal + " " + pid).waitFor() == 0) + return; + throw new Exception("failed to send signal " + signal + " to process " + + pid); + } + + @Nullable + private Integer killNicely() { + try { + signal("TERM"); + return subprocess.waitFor(); + } catch (Exception e) { + return null; + } + } + + @Nullable + private Integer killHard() { + try { + signal("QUIT"); + return subprocess.waitFor(); + } catch (Exception e) { + return null; + } + } + + /** + * Move the worker out of the stopped state and back to operating. + * + * @throws Exception + * if it fails. + */ + @Override + public void startWorker() throws Exception { + signal("CONT"); + stopped = false; + } + + /** + * Move the worker into the stopped state from the operating state. + * + * @throws Exception + * if it fails. + */ + @Override + public void stopWorker() throws Exception { + signal("STOP"); + stopped = true; + } + + /** + * @return The status of the workflow run. Note that this can be an + * expensive operation. + */ + @Override + public RemoteStatus getWorkerStatus() { + if (subprocess == null) + return Initialized; + if (finished) + return Finished; + try { + setExitCode(subprocess.exitValue()); + } catch (IllegalThreadStateException e) { + if (stopped) + return Stopped; + return Operating; + } + finished = true; + readyToSendEmail = true; + accounting.runCeased(); + buildUR(exitCode.intValue() == 0 ? Completed : Failed, exitCode); + return Finished; + } + + @Override + public String getConfiguration() { + return ""; + } + + @Override + public String getName() { + return DEFAULT_LISTENER_NAME; + } + + @Override + public String getProperty(String propName) throws RemoteException { + switch (Property.is(propName)) { + case STDOUT: + return stdout.toString(); + case STDERR: + return stderr.toString(); + case EXIT_CODE: + return (exitCode == null) ? "" : exitCode.toString(); + case EMAIL: + return emailAddress; + case READY_TO_NOTIFY: + return Boolean.toString(readyToSendEmail); + case USAGE: + try { + JobUsageRecord toReturn; + if (subprocess == null) { + toReturn = newUR(); + toReturn.setStatus(Held.toString()); + } else if (ur == null) { + toReturn = newUR(); + toReturn.setStatus(Started.toString()); + toReturn.addStartAndEnd(start, new Date()); + toReturn.addUser(System.getProperty("user.name"), null); + } else { + toReturn = ur; + } + /* + * Note that this record is not to be pushed to the server. That + * is done elsewhere (when a proper record is produced) + */ + return toReturn.marshal(); + } catch (Exception e) { + e.printStackTrace(); + return ""; + } + default: + throw new RemoteException("unknown property"); + } + } + + @Override + public String getType() { + return DEFAULT_LISTENER_NAME; + } + + @Override + public String[] listProperties() { + return Property.names(); + } + + @Override + public void setProperty(String propName, String value) + throws RemoteException { + switch (Property.is(propName)) { + case EMAIL: + emailAddress = value; + return; + case READY_TO_NOTIFY: + readyToSendEmail = parseBoolean(value); + return; + case STDOUT: + case STDERR: + case EXIT_CODE: + case USAGE: + throw new RemoteException("property is read only"); + default: + throw new RemoteException("unknown property"); + } + } + + @Override + public RemoteListener getDefaultListener() { + return this; + } + + @Override + public void setURReceiver(@Nonnull UsageRecordReceiver receiver) { + urreceiver = receiver; + } + + @Override + public void deleteLocalResources() throws ImplementationException { + try { + if (workflowFile != null && workflowFile.getParentFile().exists()) + forceDelete(workflowFile); + } catch (IOException e) { + throw new ImplementationException("problem deleting workflow file", + e); + } + } +} + +/** + * An engine for asynchronously copying from an {@link InputStream} to a + * {@link Writer}. + * + * @author Donal Fellows + */ +class AsyncCopy extends Thread { + @Nonnull + private BufferedReader from; + @Nonnull + private Writer to; + @Nullable + private Holder<Integer> pidHolder; + + AsyncCopy(@Nonnull InputStream from, @Nonnull Writer to) + throws UnsupportedEncodingException { + this(from, to, null); + } + + AsyncCopy(@Nonnull InputStream from, @Nonnull Writer to, + @Nullable Holder<Integer> pid) throws UnsupportedEncodingException { + this.from = new BufferedReader(new InputStreamReader(from, + SYSTEM_ENCODING)); + this.to = to; + this.pidHolder = pid; + setDaemon(true); + start(); + } + + @Override + public void run() { + try { + if (pidHolder != null) { + String line = from.readLine(); + if (line.matches("^pid:\\d+$")) + synchronized (pidHolder) { + pidHolder.value = parseInt(line.substring(4)); + } + else + to.write(line + System.getProperty("line.separator")); + } + copy(from, to); + } catch (IOException e) { + } + } +} + +/** + * A helper for asynchronously writing a password to a subprocess's stdin. + * + * @author Donal Fellows + */ +class PasswordWriterThread extends Thread { + private OutputStream to; + private char[] chars; + + PasswordWriterThread(@Nonnull Process to, @Nonnull char[] chars) { + this.to = to.getOutputStream(); + assert chars != null; + this.chars = chars; + setDaemon(true); + start(); + } + + @Override + public void run() { + try (PrintWriter pw = new PrintWriter(new OutputStreamWriter(to, + SYSTEM_ENCODING))) { + pw.println(chars); + } catch (UnsupportedEncodingException e) { + // Not much we can do here + e.printStackTrace(); + } finally { + /* + * We don't trust GC to clear password from memory. We also take + * care not to clear the default password! + */ + if (chars != KEYSTORE_PASSWORD) + Arrays.fill(chars, '\00'); + } + } +} + +enum Property { + STDOUT("stdout"), STDERR("stderr"), EXIT_CODE("exitcode"), READY_TO_NOTIFY( + "readyToNotify"), EMAIL("notificationAddress"), USAGE("usageRecord"); + + private String s; + + private Property(String s) { + this.s = s; + pmap.put(s, this); + } + + @Override + public String toString() { + return s; + } + + public static Property is(@Nonnull String s) { + return pmap.get(s); + } + + @Nonnull + public static String[] names() { + return pmap.keySet().toArray(new String[pmap.size()]); + } +} + +enum Status { + Aborted, Completed, Failed, Held, Queued, Started, Suspended +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/utils/FilenameVerifier.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/utils/FilenameVerifier.java b/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/utils/FilenameVerifier.java new file mode 100644 index 0000000..fbc3a72 --- /dev/null +++ b/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/utils/FilenameVerifier.java @@ -0,0 +1,156 @@ +/* + * Copyright (C) 2010-2011 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.localworker.impl.utils; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +/** + * Utility class that handles filename validation on different target platforms. + * + * @author Donal Fellows. + */ +public abstract class FilenameVerifier { + private FilenameVerifier(){} + + static final boolean IS_WINDOWS = System.getProperty("os.name").toLowerCase().contains("win"); + + @SuppressWarnings("serial") + private static final Set<String> ILLEGAL_NAMES = new HashSet<String>(){{ + add(""); + add(".."); + add("."); + if (IS_WINDOWS) { + add("con"); + add("prn"); + add("nul"); + add("aux"); + for (int i = 1; i <= 9; i++) { + add("com" + i); + add("lpt" + i); + } + } + }}; + @SuppressWarnings("serial") + private static final Set<Character> ILLEGAL_CHARS = new HashSet<Character>(){{ + add('/'); + for (char i=0 ; i<32 ; i++) + add(i); + if (IS_WINDOWS) { + add('\\'); + add('>'); + add('<'); + add(':'); + add('"'); + add('|'); + add('?'); + add('*'); + } else { + add(' '); // whitespace; too much trouble from these + add('\t'); + add('\r'); + add('\n'); + } + }}; + @SuppressWarnings("serial") + private static final Set<String> ILLEGAL_PREFIXES = new HashSet<String>(){{ + if (IS_WINDOWS) { + add("con."); + add("prn."); + add("nul."); + add("aux."); + for (int i = 1; i <= 9; i++) { + add("com" + i + "."); + add("lpt" + i + "."); + } + } + }}; + @SuppressWarnings("serial") + private static final Set<String> ILLEGAL_SUFFIXES = new HashSet<String>(){{ + if (IS_WINDOWS) { + add(" "); + add("."); + } + }}; + + /** + * Construct a file handle, applying platform-specific filename validation + * rules in the process. + * + * @param dir + * The directory acting as a root, which is assumed to be + * correctly named. May be <tt>null</tt>. + * @param names + * The names of filename fragments to apply the checks to. Must + * have at least one value. + * @return The file handle. Never <tt>null</tt>. + * @throws IOException + * If validation fails. + */ + public static File getValidatedFile(File dir, String... names) + throws IOException { + if (names.length == 0) + throw new IOException("empty filename"); + File f = dir; + for (String name : names) { + String low = name.toLowerCase(); + if (ILLEGAL_NAMES.contains(low)) + throw new IOException("illegal filename"); + for (char c : ILLEGAL_CHARS) + if (low.indexOf(c) >= 0) + throw new IOException("illegal filename"); + for (String s : ILLEGAL_PREFIXES) + if (low.startsWith(s)) + throw new IOException("illegal filename"); + for (String s : ILLEGAL_SUFFIXES) + if (low.endsWith(s)) + throw new IOException("illegal filename"); + f = new File(f, name); + } + assert f != null; + return f; + } + + /** + * Create a file handle where the underlying file must exist. + * + * @param dir + * The directory that will contain the file. + * @param name + * The name of the file; will be validated. + * @return The handle. + * @throws IOException + * If validation fails or the file doesn't exist. + */ + public static File getValidatedExistingFile(File dir, String name) + throws IOException { + File f = getValidatedFile(dir, name); + if (!f.exists()) + throw new IOException("doesn't exist"); + return f; + } + + /** + * Create a file handle where the underlying file must <i>not</i> exist. + * + * @param dir + * The directory that will contain the file. + * @param name + * The name of the file; will be validated. + * @return The handle. The file will not be created by this method. + * @throws IOException + * If validation fails or the file does exist. + */ + public static File getValidatedNewFile(File dir, String name) + throws IOException { + File f = getValidatedFile(dir, name); + if (f.exists()) + throw new IOException("already exists"); + return f; + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/utils/TimingOutTask.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/utils/TimingOutTask.java b/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/utils/TimingOutTask.java new file mode 100644 index 0000000..3dd3ac1 --- /dev/null +++ b/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/utils/TimingOutTask.java @@ -0,0 +1,40 @@ +package org.taverna.server.localworker.impl.utils; + +import javax.annotation.Nullable; + +/** + * A class that handles running a task that can take some time. + * + * @author Donal Fellows + * + */ +public abstract class TimingOutTask extends Thread { + public abstract void doIt() throws Exception; + + @Nullable + private Exception ioe; + + @Override + public final void run() { + try { + doIt(); + } catch (Exception ioe) { + this.ioe = ioe; + } + } + + public TimingOutTask() { + this.setDaemon(true); + } + + public void doOrTimeOut(long timeout) throws Exception { + start(); + try { + join(timeout); + } catch (InterruptedException e) { + interrupt(); + } + if (ioe != null) + throw ioe; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-worker/src/main/resources/security.policy ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/resources/security.policy b/taverna-server-worker/src/main/resources/security.policy new file mode 100644 index 0000000..5b5c322 --- /dev/null +++ b/taverna-server-worker/src/main/resources/security.policy @@ -0,0 +1,11 @@ +//keystore "signers.jks"; +//grant signedBy "taverna" { +// permission java.util.PropertyPermission "*", "read,write"; +// permission java.lang.RuntimePermission "shutdownHooks"; +// permission java.lang.RuntimePermission "exitVM"; +// permission java.io.FilePermission "<<ALL FILES>>", "read,write,execute,delete"; +// permission java.net.SocketPermission "localhost:1024-" "accept,connect,listen"; +//}; +grant { + permission java.security.AllPermission "*:*"; +}; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/2c71f9a9/taverna-server-worker/src/test/java/org/taverna/server/localworker/impl/LocalWorkerTest.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/test/java/org/taverna/server/localworker/impl/LocalWorkerTest.java b/taverna-server-worker/src/test/java/org/taverna/server/localworker/impl/LocalWorkerTest.java new file mode 100644 index 0000000..7bcd92e --- /dev/null +++ b/taverna-server-worker/src/test/java/org/taverna/server/localworker/impl/LocalWorkerTest.java @@ -0,0 +1,551 @@ +/* + * Copyright (C) 2010-2012 The University of Manchester + * + * See the file "LICENSE" for license terms. + */ +package org.taverna.server.localworker.impl; + +import static java.util.UUID.randomUUID; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.taverna.server.localworker.impl.LocalWorker.DO_MKDIR; + +import java.io.File; +import java.rmi.RemoteException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.taverna.server.localworker.api.Worker; +import org.taverna.server.localworker.api.WorkerFactory; +import org.taverna.server.localworker.remote.IllegalStateTransitionException; +import org.taverna.server.localworker.remote.ImplementationException; +import org.taverna.server.localworker.remote.RemoteDirectory; +import org.taverna.server.localworker.remote.RemoteInput; +import org.taverna.server.localworker.remote.RemoteListener; +import org.taverna.server.localworker.remote.RemoteStatus; +import org.taverna.server.localworker.server.UsageRecordReceiver; + +public class LocalWorkerTest { + LocalWorker lw; + static List<String> events; + + public static RemoteStatus returnThisStatus = RemoteStatus.Operating; + + static class DummyWorker implements Worker { + @Override + public RemoteListener getDefaultListener() { + return new RemoteListener() { + @Override + public String getConfiguration() { + return "RLCONFIG"; + } + + @Override + public String getName() { + return "RLNAME"; + } + + @Override + public String getProperty(String propName) { + return "RLPROP[" + propName + "]"; + } + + @Override + public String getType() { + return "RLTYPE"; + } + + @Override + public String[] listProperties() { + return new String[] { "RLP1", "RLP2" }; + } + + @Override + public void setProperty(String propName, String value) { + events.add("setProperty["); + events.add(propName); + events.add(value); + events.add("]"); + } + }; + } + + @Override + public RemoteStatus getWorkerStatus() { + events.add("status=" + returnThisStatus); + return returnThisStatus; + } + + @Override + public boolean initWorker(LocalWorker local, + String executeWorkflowCommand, byte[] workflow, + File workingDir, File inputBaclava, + Map<String, File> inputFiles, Map<String, String> inputValues, + Map<String, String> delimiters, File outputBaclava, File cmdir, + char[] cmpass, boolean doprov, Map<String, String> env, + String id, List<String> conf) throws Exception { + events.add("init["); + events.add(executeWorkflowCommand); + events.add(new String(workflow, "UTF-8")); + int dirLen = workingDir.getName().length(); + events.add(Integer.toString(dirLen)); + events.add(inputBaclava == null ? "<null>" : inputBaclava + .toString().substring(dirLen)); + Map<String, String> in = new TreeMap<>(); + for (Entry<String, File> name : inputFiles.entrySet()) + in.put(name.getKey(), name.getValue() == null ? "<null>" : name + .getValue().getName()); + events.add(in.toString()); + events.add(new TreeMap<>(inputValues).toString()); + events.add(outputBaclava == null ? "<null>" : outputBaclava + .getName()); + // TODO: check cmdir and cmpass + // TODO: check doprov + // TODO: log env + // TODO: check delimiters + events.add("]"); + return true; + } + + @Override + public void killWorker() throws Exception { + events.add("kill"); + } + + @Override + public void startWorker() throws Exception { + events.add("start"); + } + + @Override + public void stopWorker() throws Exception { + events.add("stop"); + } + + @Override + public void setURReceiver(UsageRecordReceiver receiver) { + // We just ignore this + } + + @Override + public void deleteLocalResources() throws ImplementationException { + // Nothing to do here + } + } + + WorkerFactory factory = new WorkerFactory() { + @Override + public Worker makeInstance() throws Exception { + return new DummyWorker(); + } + }; + + @Before + public void setUp() throws Exception { + lw = new LocalWorker("XWC", "WF".getBytes("UTF-8"), null, randomUUID(), + new HashMap<String, String>(), new ArrayList<String>(), factory); + events = new ArrayList<>(); + returnThisStatus = RemoteStatus.Operating; + } + + @After + public void tearDown() throws Exception { + lw.destroy(); + } + + private List<String> l(String... strings) { + return Arrays.asList(strings); + } + + // -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + + @Test + public void testDestroy1() throws Exception { + lw.destroy(); + assertEquals(l(), events); + } + + @Test + public void testDestroy2() throws Exception { + lw.setStatus(RemoteStatus.Operating); + lw.destroy(); + assertEquals( + l("init[", "XWC", "WF", "36", "<null>", "{}", "{}", "<null>", + "]", "kill"), events); + } + + @Test + public void testDestroy3() throws Exception { + lw.setStatus(RemoteStatus.Operating); + lw.setStatus(RemoteStatus.Stopped); + lw.destroy(); + assertEquals( + l("init[", "XWC", "WF", "36", "<null>", "{}", "{}", "<null>", + "]", "stop", "kill"), events); + } + + @Test + public void testDestroy4() throws Exception { + lw.setStatus(RemoteStatus.Operating); + lw.setStatus(RemoteStatus.Finished); + assertEquals( + l("init[", "XWC", "WF", "36", "<null>", "{}", "{}", "<null>", + "]", "kill"), events); + lw.destroy(); + assertEquals( + l("init[", "XWC", "WF", "36", "<null>", "{}", "{}", "<null>", + "]", "kill"), events); + } + + @Test + public void testAddListener() { + Throwable t = null; + try { + lw.addListener(null); + } catch (Throwable caught) { + t = caught; + } + assertNotNull(t); + assertSame(ImplementationException.class, t.getClass()); + assertNotNull(t.getMessage()); + assertEquals("not implemented", t.getMessage()); + } + + @Test + public void testGetInputBaclavaFile() throws Exception { + assertNull(lw.getInputBaclavaFile()); + lw.setInputBaclavaFile("IBaclava"); + assertNotNull(lw.getInputBaclavaFile()); + assertEquals("IBaclava", lw.getInputBaclavaFile()); + lw.makeInput("FOO").setValue("BAR"); + assertNull(lw.getInputBaclavaFile()); + } + + @Test + public void testGetInputsWithValue() throws Exception { + assertEquals(0, lw.getInputs().size()); + + lw.makeInput("FOO").setValue("BAR"); + + assertEquals(1, lw.getInputs().size()); + assertEquals("FOO", lw.getInputs().get(0).getName()); + assertNull(lw.getInputs().get(0).getFile()); + assertNotNull(lw.getInputs().get(0).getValue()); + + lw.setInputBaclavaFile("BLAH"); + + assertEquals(1, lw.getInputs().size()); + assertNull(lw.getInputs().get(0).getFile()); + assertNull(lw.getInputs().get(0).getValue()); + } + + @Test + public void testGetInputsWithFile() throws Exception { + assertEquals(0, lw.getInputs().size()); + + lw.makeInput("BAR").setFile("FOO"); + + assertEquals(1, lw.getInputs().size()); + assertEquals("BAR", lw.getInputs().get(0).getName()); + assertNotNull(lw.getInputs().get(0).getFile()); + assertNull(lw.getInputs().get(0).getValue()); + + lw.setInputBaclavaFile("BLAH"); + + assertEquals(1, lw.getInputs().size()); + assertNull(lw.getInputs().get(0).getFile()); + assertNull(lw.getInputs().get(0).getValue()); + } + + @Test + public void testGetListenerTypes() { + assertEquals("[]", lw.getListenerTypes().toString()); + } + + @Test + public void testGetListeners() throws Exception { + assertEquals(1, lw.getListeners().size()); + RemoteListener rl = lw.getListeners().get(0); + assertEquals("RLNAME", rl.getName()); + assertEquals("RLCONFIG", rl.getConfiguration()); + assertEquals("RLTYPE", rl.getType()); + assertEquals("[RLP1, RLP2]", Arrays.asList(rl.listProperties()) + .toString()); + assertEquals("RLPROP[RLP1]", rl.getProperty("RLP1")); + assertEquals("RLPROP[RLP2]", rl.getProperty("RLP2")); + rl.setProperty("FOOBAR", "BARFOO"); + assertEquals(l("setProperty[", "FOOBAR", "BARFOO", "]"), events); + } + + @Test + public void testGetOutputBaclavaFile() throws Exception { + assertNull(lw.getOutputBaclavaFile()); + lw.setOutputBaclavaFile("notnull"); + assertEquals("notnull", lw.getOutputBaclavaFile()); + lw.setOutputBaclavaFile(null); + assertNull(lw.getOutputBaclavaFile()); + } + + @Test + public void testGetSecurityContext() throws Exception { + boolean md = DO_MKDIR; + LocalWorker.DO_MKDIR = false; // HACK! Work around Hudson problem... + try { + assertNotNull(lw.getSecurityContext()); + } finally { + LocalWorker.DO_MKDIR = md; + } + } + + @Test + public void testGetStatusInitial() { + assertEquals(RemoteStatus.Initialized, lw.getStatus()); + } + + @Test + public void testGetStatus() throws Exception { + assertEquals(RemoteStatus.Initialized, lw.getStatus()); + returnThisStatus = RemoteStatus.Operating; + assertEquals(RemoteStatus.Initialized, lw.getStatus()); + lw.setStatus(RemoteStatus.Operating); + assertEquals(RemoteStatus.Operating, lw.getStatus()); + assertEquals(RemoteStatus.Operating, lw.getStatus()); + returnThisStatus = RemoteStatus.Finished; + assertEquals(RemoteStatus.Finished, lw.getStatus()); + returnThisStatus = RemoteStatus.Stopped; + assertEquals(RemoteStatus.Finished, lw.getStatus()); + assertEquals( + l("init[", "XWC", "WF", "36", "<null>", "{}", "{}", "<null>", + "]", "status=Operating", "status=Operating", + "status=Finished"), events); + } + + @Test + public void testGetWorkingDirectory() throws Exception { + RemoteDirectory rd = lw.getWorkingDirectory(); + assertNotNull(rd); + assertNotNull(rd.getContents()); + assertNull(rd.getContainingDirectory()); + assertNotNull(rd.getName()); + assertEquals(-1, rd.getName().indexOf('/')); + assertFalse("..".equals(rd.getName())); + assertEquals("", rd.getName()); + } + + @Test + public void testValidateFilename() throws Exception { + lw.validateFilename("foobar"); + lw.validateFilename("foo/bar"); + lw.validateFilename("foo.bar"); + lw.validateFilename("foo..bar"); + } + + @Test(expected = IllegalArgumentException.class) + public void testValidateFilenameBad0() throws Exception { + lw.validateFilename("./."); + } + + @Test(expected = IllegalArgumentException.class) + public void testValidateFilenameBad1() throws Exception { + lw.validateFilename("/"); + } + + @Test(expected = IllegalArgumentException.class) + public void testValidateFilenameBad2() throws Exception { + lw.validateFilename(""); + } + + @Test(expected = IllegalArgumentException.class) + public void testValidateFilenameBad3() throws Exception { + lw.validateFilename(null); + } + + @Test(expected = IllegalArgumentException.class) + public void testValidateFilenameBad4() throws Exception { + lw.validateFilename(".."); + } + + @Test(expected = IllegalArgumentException.class) + public void testValidateFilenameBad5() throws Exception { + lw.validateFilename("foo/../bar"); + } + + @Test + public void testMakeInput() throws Exception { + assertEquals(0, lw.getInputs().size()); + + RemoteInput ri = lw.makeInput("TEST"); + + assertNotNull(ri); + assertEquals(1, lw.getInputs().size()); + assertNotSame(ri, lw.getInputs().get(0)); // different delegates + assertEquals("TEST", ri.getName()); + assertNull(ri.getFile()); + assertNull(ri.getValue()); + + lw.setInputBaclavaFile("bad"); + ri.setFile("good"); + assertEquals("good", ri.getFile()); + assertNull(lw.getInputBaclavaFile()); + ri.setValue("very good"); + assertEquals("very good", ri.getValue()); + assertNull(ri.getFile()); + assertNull(lw.getInputBaclavaFile()); + + lw.makeInput("TEST2"); + assertEquals(2, lw.getInputs().size()); + } + + @Test(expected = IllegalArgumentException.class) + public void testMakeInputFileSanity() throws Exception { + lw.makeInput("foo").setFile("/../bar"); + } + + @Test + public void testMakeListener() { + Throwable t = null; + try { + lw.makeListener("?", "?"); + } catch (Throwable caught) { + t = caught; + } + assertNotNull(t); + assertSame(RemoteException.class, t.getClass()); + assertNotNull(t.getMessage()); + assertEquals("listener manufacturing unsupported", t.getMessage()); + } + + @Test + public void testSetInputBaclavaFile1() throws Exception { + assertNull(lw.getInputBaclavaFile()); + lw.setInputBaclavaFile("eg"); + assertEquals("eg", lw.getInputBaclavaFile()); + } + + @Test + public void testSetInputBaclavaFile2() throws Exception { + RemoteInput ri = lw.makeInput("foo"); + ri.setValue("bar"); + assertEquals("bar", ri.getValue()); + lw.setInputBaclavaFile("eg"); + assertNull(ri.getValue()); + } + + @Test + public void testSetOutputBaclavaFile1() throws Exception { + assertNull(lw.outputBaclava); + lw.setOutputBaclavaFile("foobar"); + assertEquals("foobar", lw.outputBaclava); + assertEquals("foobar", lw.getOutputBaclavaFile()); + lw.setOutputBaclavaFile("foo/bar"); + assertEquals("foo/bar", lw.outputBaclava); + assertEquals("foo/bar", lw.getOutputBaclavaFile()); + lw.setOutputBaclavaFile(null); + assertNull(lw.outputBaclava); + assertNull(lw.getOutputBaclavaFile()); + } + + @Test(expected = IllegalArgumentException.class) + public void testSetOutputBaclavaFile2() throws Exception { + lw.setOutputBaclavaFile("/foobar"); + } + + @Test(expected = IllegalArgumentException.class) + public void testSetOutputBaclavaFile3() throws Exception { + lw.setOutputBaclavaFile("foo/../bar"); + } + + @Test + public void testSetStatus0() throws Exception { + lw.setStatus(RemoteStatus.Initialized); + lw.setStatus(RemoteStatus.Initialized); + lw.setStatus(RemoteStatus.Operating); + lw.setStatus(RemoteStatus.Operating); + lw.setStatus(RemoteStatus.Stopped); + lw.setStatus(RemoteStatus.Stopped); + lw.setStatus(RemoteStatus.Operating); + lw.setStatus(RemoteStatus.Operating); + lw.setStatus(RemoteStatus.Finished); + lw.setStatus(RemoteStatus.Finished); + assertEquals( + l("init[", "XWC", "WF", "36", "<null>", "{}", "{}", "<null>", + "]", "stop", "start", "kill"), events); + } + + @Test + public void testSetStatus1() throws Exception { + lw.setStatus(RemoteStatus.Operating); + lw.setStatus(RemoteStatus.Stopped); + lw.setStatus(RemoteStatus.Finished); + assertEquals( + l("init[", "XWC", "WF", "36", "<null>", "{}", "{}", "<null>", + "]", "stop", "kill"), events); + } + + @Test + public void testSetStatus2() throws Exception { + lw.setStatus(RemoteStatus.Initialized); + lw.setStatus(RemoteStatus.Finished); + assertEquals(l(), events); + } + + @Test(expected = IllegalStateTransitionException.class) + public void testSetStatus3() throws Exception { + lw.setStatus(RemoteStatus.Initialized); + lw.setStatus(RemoteStatus.Finished); + lw.setStatus(RemoteStatus.Initialized); + } + + @Test(expected = IllegalStateTransitionException.class) + public void testSetStatus4() throws Exception { + lw.setStatus(RemoteStatus.Initialized); + lw.setStatus(RemoteStatus.Operating); + lw.setStatus(RemoteStatus.Initialized); + } + + @Test(expected = IllegalStateTransitionException.class) + public void testSetStatus5() throws Exception { + lw.setStatus(RemoteStatus.Initialized); + lw.setStatus(RemoteStatus.Stopped); + } + + @Test(expected = IllegalStateTransitionException.class) + public void testSetStatus6() throws Exception { + lw.setStatus(RemoteStatus.Finished); + lw.setStatus(RemoteStatus.Stopped); + } + + @Test(expected = IllegalStateTransitionException.class) + public void testSetStatus7() throws Exception { + lw.setStatus(RemoteStatus.Operating); + lw.setStatus(RemoteStatus.Stopped); + lw.setStatus(RemoteStatus.Initialized); + } + + @Test + public void testLifecycle() throws Exception { + lw.makeInput("foo").setFile("foofile"); + lw.makeInput("bar").setValue("barvalue"); + lw.setOutputBaclavaFile("spong"); + lw.setOutputBaclavaFile("boo"); + lw.setStatus(RemoteStatus.Operating); + lw.setStatus(RemoteStatus.Finished); + // Assumes order of map, so fragile but works... + assertEquals( + l("init[", "XWC", "WF", "36", "<null>", + "{bar=<null>, foo=foofile}", + "{bar=barvalue, foo=null}", "boo", "]", "kill"), events); + } +}
