http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/WorkerCore.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/WorkerCore.java b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/WorkerCore.java new file mode 100644 index 0000000..4aa6605 --- /dev/null +++ b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/WorkerCore.java @@ -0,0 +1,931 @@ +/* + */ +package org.taverna.server.localworker.impl; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static java.io.File.createTempFile; +import static java.io.File.pathSeparator; +import static java.lang.Boolean.parseBoolean; +import static java.lang.Double.parseDouble; +import static java.lang.Integer.parseInt; +import static java.lang.Long.parseLong; +import static java.lang.Runtime.getRuntime; +import static java.lang.System.out; +import static java.net.InetAddress.getLocalHost; +import static org.apache.commons.io.FileUtils.forceDelete; +import static org.apache.commons.io.FileUtils.sizeOfDirectory; +import static org.apache.commons.io.FileUtils.write; +import static org.apache.commons.io.IOUtils.copy; +import static org.taverna.server.localworker.api.Constants.CREDENTIAL_MANAGER_DIRECTORY; +import static org.taverna.server.localworker.api.Constants.CREDENTIAL_MANAGER_PASSWORD; +import static org.taverna.server.localworker.api.Constants.DEATH_TIME; +import static org.taverna.server.localworker.api.Constants.DEFAULT_LISTENER_NAME; +import static org.taverna.server.localworker.api.Constants.KEYSTORE_PASSWORD; +import static org.taverna.server.localworker.api.Constants.START_WAIT_TIME; +import static org.taverna.server.localworker.api.Constants.SYSTEM_ENCODING; +import static org.taverna.server.localworker.api.Constants.TIME; +import static org.taverna.server.localworker.impl.Status.Aborted; +import static org.taverna.server.localworker.impl.Status.Completed; +import static org.taverna.server.localworker.impl.Status.Failed; +import static org.taverna.server.localworker.impl.Status.Held; +import static org.taverna.server.localworker.impl.Status.Started; +import static org.taverna.server.localworker.impl.TavernaRunManager.interactionFeedPath; +import static org.taverna.server.localworker.impl.TavernaRunManager.interactionHost; +import static org.taverna.server.localworker.impl.TavernaRunManager.interactionPort; +import static org.taverna.server.localworker.impl.TavernaRunManager.interactionWebdavPath; +import static org.taverna.server.localworker.impl.WorkerCore.pmap; +import static org.taverna.server.localworker.remote.RemoteStatus.Finished; +import static org.taverna.server.localworker.remote.RemoteStatus.Initialized; +import static org.taverna.server.localworker.remote.RemoteStatus.Operating; +import static org.taverna.server.localworker.remote.RemoteStatus.Stopped; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.io.UnsupportedEncodingException; +import java.io.Writer; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.rmi.RemoteException; +import java.rmi.server.UnicastRemoteObject; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.xml.datatype.DatatypeConfigurationException; +import javax.xml.ws.Holder; + +import org.apache.taverna.server.usagerecord.JobUsageRecord; +import org.taverna.server.localworker.api.RunAccounting; +import org.taverna.server.localworker.api.Worker; +import org.taverna.server.localworker.impl.utils.TimingOutTask; +import org.taverna.server.localworker.remote.ImplementationException; +import org.taverna.server.localworker.remote.RemoteListener; +import org.taverna.server.localworker.remote.RemoteStatus; +import org.taverna.server.localworker.server.UsageRecordReceiver; + +/** + * The core class that connects to a Taverna command-line workflow execution + * engine. This implementation always registers a single listener, &lquo; + * <tt>io</tt> &rquo;, with two properties representing the stdout and stderr of + * the run and one representing the exit code. The listener is + * remote-accessible. It does not support attaching any other listeners. + * + * @author Donal Fellows + */ +@SuppressWarnings("serial") +public class WorkerCore extends UnicastRemoteObject implements Worker, + RemoteListener { + @Nonnull + static final Map<String, Property> pmap = new HashMap<>(); + /** + * Regular expression to extract the detailed timing information from the + * output of /usr/bin/time + */ + @Nonnull + private static final Pattern TimeRE; + static { + final String TIMERE = "([0-9.:]+)"; + final String TERMS = "(real|user|system|sys|elapsed)"; + TimeRE = Pattern.compile(TIMERE + " *" + TERMS + "[ \t]*" + TIMERE + + " *" + TERMS + "[ \t]*" + TIMERE + " *" + TERMS); + } + + /** + * Environment variables to remove before any fork (because they're large or + * potentially leaky). + */ + // TODO Conduct a proper survey of what to remove + @Nonnull + private static final String[] ENVIRONMENT_TO_REMOVE = { "SUDO_COMMAND", + "SUDO_USER", "SUDO_GID", "SUDO_UID", "DISPLAY", "LS_COLORS", + "XFILESEARCHPATH", "SSH_AGENT_PID", "SSH_AUTH_SOCK" }; + + @Nullable + Process subprocess; + @Nonnull + final StringWriter stdout; + @Nonnull + final StringWriter stderr; + @Nullable + Integer exitCode; + boolean readyToSendEmail; + @Nullable + String emailAddress; + @Nullable + Date start; + @Nonnull + final RunAccounting accounting; + @Nonnull + final Holder<Integer> pid; + + private boolean finished; + @Nullable + private JobUsageRecord ur; + @Nullable + private File wd; + @Nullable + private UsageRecordReceiver urreceiver; + @Nullable + private File workflowFile; + private boolean stopped; + + /** + * @param accounting + * Object that looks after how many runs are executing. + * @throws RemoteException + */ + public WorkerCore(@Nonnull RunAccounting accounting) throws RemoteException { + super(); + stdout = new StringWriter(); + stderr = new StringWriter(); + pid = new Holder<>(); + this.accounting = accounting; + } + + private int getPID() { + synchronized (pid) { + if (pid.value == null) + return -1; + return pid.value; + } + } + + /** + * Fire up the workflow. This causes a transition into the operating state. + * + * @param executeWorkflowCommand + * The command to run to execute the workflow. + * @param workflow + * The workflow document to execute. + * @param workingDir + * What directory to use as the working directory. + * @param inputBaclava + * The baclava file to use for inputs, or <tt>null</tt> to use + * the other <b>input*</b> arguments' values. + * @param inputFiles + * A mapping of input names to files that supply them. Note that + * we assume that nothing mapped here will be mapped in + * <b>inputValues</b>. + * @param inputValues + * A mapping of input names to values to supply to them. Note + * that we assume that nothing mapped here will be mapped in + * <b>inputFiles</b>. + * @param outputBaclava + * What baclava file to write the output from the workflow into, + * or <tt>null</tt> to have it written into the <tt>out</tt> + * subdirectory. + * @param token + * The name of the workflow run. + * @return <tt>true</tt> if the worker started, or <tt>false</tt> if a + * timeout occurred. + * @throws IOException + * If any of quite a large number of things goes wrong. + */ + @Override + public boolean initWorker( + @Nonnull final LocalWorker local, + @Nonnull final String executeWorkflowCommand, + @Nonnull final byte[] workflow, + @Nonnull final File workingDir, + @Nullable final File inputBaclava, + @Nonnull final Map<String, File> inputFiles, + @Nonnull final Map<String, String> inputValues, + @Nonnull final Map<String, String> inputDelimiters, + @Nullable final File outputBaclava, + @Nonnull final File securityDir, + @Nullable final char[] password, + final boolean generateProvenance, + @Nonnull final Map<String, String> environment, + @Nullable final String token, + @Nonnull final List<String> runtime) throws IOException { + try { + new TimingOutTask() { + @Override + public void doIt() throws IOException { + startExecutorSubprocess( + createProcessBuilder(local, executeWorkflowCommand, + workflow, workingDir, inputBaclava, + inputFiles, inputValues, inputDelimiters, + outputBaclava, securityDir, password, + generateProvenance, environment, token, + runtime), password); + } + }.doOrTimeOut(START_WAIT_TIME); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); + } + return subprocess != null; + } + + private void startExecutorSubprocess(@Nonnull ProcessBuilder pb, + @Nullable char[] password) throws IOException { + // Start the subprocess + out.println("starting " + pb.command() + " in directory " + + pb.directory() + " with environment " + pb.environment()); + subprocess = pb.start(); + if (subprocess == null) + throw new IOException("unknown failure creating process"); + start = new Date(); + accounting.runStarted(); + + // Capture its stdout and stderr + new AsyncCopy(subprocess.getInputStream(), stdout, pid); + new AsyncCopy(subprocess.getErrorStream(), stderr); + if (password != null) + new PasswordWriterThread(subprocess, password); + } + + /** + * Assemble the process builder. Does not launch the subprocess. + * + * @param local + * The local worker container. + * @param executeWorkflowCommand + * The reference to the workflow engine implementation. + * @param workflow + * The workflow to execute. + * @param workingDir + * The working directory to use. + * @param inputBaclava + * What file to read a baclava document from (or <tt>null</tt>) + * @param inputFiles + * The mapping from inputs to files. + * @param inputValues + * The mapping from inputs to literal values. + * @param outputBaclava + * What file to write a baclava document to (or <tt>null</tt>) + * @param securityDir + * The credential manager directory. + * @param password + * The password for the credential manager. + * @param environment + * The seed environment + * @param token + * The run identifier that the server wants to use. + * @param runtime + * Any runtime parameters to Java. + * @return The configured process builder. + * @throws IOException + * If file handling fails + * @throws UnsupportedEncodingException + * If we can't encode any text (unlikely) + * @throws FileNotFoundException + * If we can't write the workflow out (unlikely) + */ + @Nonnull + ProcessBuilder createProcessBuilder(@Nonnull LocalWorker local, + @Nonnull String executeWorkflowCommand, @Nonnull byte[] workflow, + @Nonnull File workingDir, @Nullable File inputBaclava, + @Nonnull Map<String, File> inputFiles, + @Nonnull Map<String, String> inputValues, + @Nonnull Map<String, String> inputDelimiters, + @Nullable File outputBaclava, @Nonnull File securityDir, + @Nonnull char[] password, boolean generateProvenance, + @Nonnull Map<String, String> environment, @Nonnull String token, + @Nonnull List<String> runtime) throws IOException, + UnsupportedEncodingException, FileNotFoundException { + ProcessBuilder pb = new ProcessBuilder(); + pb.command().add(TIME); + /* + * WARNING! HERE THERE BE DRAGONS! BE CAREFUL HERE! + * + * Work around _Maven_ bug with permissions in zip files! The executable + * bit is stripped by Maven's handling of file permissions, and there's + * no practical way to work around it without massively increasing the + * pain in other ways. Only want this on Unix - Windows isn't affected + * by this - so we use the file separator as a proxy for whether this is + * a true POSIX system. Ugly! Ugly ugly ugly... + * + * http://jira.codehaus.org/browse/MASSEMBLY-337 is relevant, but not + * the whole story as we don't want to use a non-standard packaging + * method as there's a real chance of it going wrong in an unexpected + * way then. Other parts of the story are that the executable bit isn't + * preserved when unpacking with the dependency plugin, and there's no + * way to be sure that the servlet container will preserve the bit + * either (as that's probably using a Java-based ZIP engine). + */ + if (File.separatorChar == '/') + pb.command().add("/bin/sh"); + pb.command().add(executeWorkflowCommand); + if (runtime != null) + pb.command().addAll(runtime); + + // Enable verbose logging + pb.command().add("-logfile"); + pb.command().add( + new File(new File(workingDir, "logs"), "detail.log") + .getAbsolutePath()); + + if (securityDir != null) { + pb.command().add(CREDENTIAL_MANAGER_DIRECTORY); + pb.command().add(securityDir.getAbsolutePath()); + out.println("security dir location: " + securityDir); + } + if (password != null) { + pb.command().add(CREDENTIAL_MANAGER_PASSWORD); + out.println("password of length " + password.length + + " will be written to subprocess stdin"); + } + + // Add arguments denoting inputs + if (inputBaclava != null) { + pb.command().add("-inputdoc"); + pb.command().add(inputBaclava.getAbsolutePath()); + if (!inputBaclava.exists()) + throw new IOException("input baclava file doesn't exist"); + } else { + for (Entry<String, File> port : inputFiles.entrySet()) { + if (port.getValue() == null) + continue; + pb.command().add("-inputfile"); + pb.command().add(port.getKey()); + pb.command().add(port.getValue().getAbsolutePath()); + if (!port.getValue().exists()) + throw new IOException("input file for port \"" + port + + "\" doesn't exist"); + } + for (Entry<String, String> port : inputValues.entrySet()) { + if (port.getValue() == null) + continue; + pb.command().add("-inputfile"); + pb.command().add(port.getKey()); + File f = createTempFile(".tav_in_", null, workingDir); + pb.command().add(f.getAbsolutePath()); + write(f, port.getValue(), "UTF-8"); + } + for (Entry<String, String> delim : inputDelimiters.entrySet()) { + if (delim.getValue() == null) + continue; + pb.command().add("-inputdelimiter"); + pb.command().add(delim.getKey()); + pb.command().add(delim.getValue()); + } + } + + // Add arguments denoting outputs + if (outputBaclava != null) { + pb.command().add("-outputdoc"); + pb.command().add(outputBaclava.getAbsolutePath()); + if (!outputBaclava.getParentFile().exists()) + throw new IOException( + "parent directory of output baclava file does not exist"); + if (outputBaclava.exists()) + throw new IOException("output baclava file exists"); + // Provenance cannot be supported when using baclava output + } else { + File out = new File(workingDir, "out"); + if (!out.mkdir()) + throw new IOException("failed to make output directory \"out\""); + // Taverna needs the dir to *not* exist now + forceDelete(out); + pb.command().add("-outputdir"); + pb.command().add(out.getAbsolutePath()); + // Enable provenance generation + if (generateProvenance) { + pb.command().add("-embedded"); + pb.command().add("-provenance"); + pb.command().add("-provbundle"); + pb.command().add("out.bundle.zip"); + } + } + + // Add an argument holding the workflow + File tmp = createTempFile(".wf_", ".scufl2", workingDir); + try (OutputStream os = new FileOutputStream(tmp)) { + os.write(workflow); + } + pb.command().add(workflowFile.getAbsolutePath()); + + // Indicate what working directory to use + pb.directory(workingDir); + wd = workingDir; + + Map<String, String> env = pb.environment(); + for (String name : ENVIRONMENT_TO_REMOVE) + env.remove(name); + + // Merge any options we have had imposed on us from outside + env.putAll(environment); + + // Patch the environment to deal with TAVUTILS-17 + assert env.get("PATH") != null; + env.put("PATH", new File(System.getProperty("java.home"), "bin") + + pathSeparator + env.get("PATH")); + // Patch the environment to deal with TAVSERV-189 + env.put("TAVERNA_APPHOME", workingDir.getCanonicalPath()); + // Patch the environment to deal with TAVSERV-224 + env.put("TAVERNA_RUN_ID", token); + if (interactionHost != null || local.interactionFeedURL != null + || local.webdavURL != null) { + env.put("INTERACTION_HOST", makeInterHost(local.interactionFeedURL)); + env.put("INTERACTION_PORT", makeInterPort(local.interactionFeedURL)); + env.put("INTERACTION_FEED", makeInterPath(local.interactionFeedURL)); + env.put("INTERACTION_WEBDAV", + local.webdavURL != null ? local.webdavURL.getPath() + : interactionWebdavPath); + String pub = makeInterPublish(local.publishURL); + if (pub != null && !pub.isEmpty()) + env.put("INTERACTION_PUBLISH", pub); + } + return pb; + } + + @Nullable + private static String makeInterHost(@Nullable URL url) { + if (url == null) + return interactionHost; + return url.getProtocol() + "://" + url.getHost(); + } + + @Nullable + private static String makeInterPort(@Nullable URL url) { + if (url == null) + return interactionPort; + int port = url.getPort(); + if (port == -1) + port = url.getDefaultPort(); + return Integer.toString(port); + } + + @Nullable + private static String makeInterPublish(@Nullable URL url) + throws IOException { + if (url == null) + return null; + try { + URI uri = url.toURI(); + int port = uri.getPort(); + if (port == -1) + return uri.getScheme() + "://" + uri.getHost(); + else + return uri.getScheme() + "://" + uri.getHost() + ":" + port; + } catch (URISyntaxException e) { + throw new IOException("problem constructing publication url", e); + } + } + + @Nullable + private static String makeInterPath(@Nullable URL url) { + if (url == null) + return interactionFeedPath; + return url.getPath(); + } + + /** + * Kills off the subprocess if it exists and is alive. + */ + @Override + public void killWorker() { + if (!finished && subprocess != null) { + final Holder<Integer> code = new Holder<>(); + for (TimingOutTask tot : new TimingOutTask[] { new TimingOutTask() { + /** Check if the workflow terminated of its own accord */ + @Override + public void doIt() throws IOException { + code.value = subprocess.exitValue(); + accounting.runCeased(); + buildUR(code.value == 0 ? Completed : Failed, code.value); + } + }, new TimingOutTask() { + /** Tell the workflow to stop */ + @Override + public void doIt() throws IOException { + code.value = killNicely(); + accounting.runCeased(); + buildUR(code.value == 0 ? Completed : Aborted, code.value); + } + }, new TimingOutTask() { + /** Kill the workflow, kill it with fire */ + @Override + public void doIt() throws IOException { + code.value = killHard(); + accounting.runCeased(); + buildUR(code.value == 0 ? Completed : Aborted, code.value); + } + } }) { + try { + tot.doOrTimeOut(DEATH_TIME); + } catch (Exception e) { + } + if (code.value != null) + break; + } + finished = true; + setExitCode(code.value); + readyToSendEmail = true; + } + } + + /** + * Integrated spot to handle writing/logging of the exit code. + * + * @param code + * The exit code. + */ + private void setExitCode(int code) { + exitCode = code; + if (code > 256 - 8) { + out.println("workflow aborted, Raven issue = " + (code - 256)); + } else if (code > 128) { + out.println("workflow aborted, signal=" + (code - 128)); + } else { + out.println("workflow exited, code=" + code); + } + } + + @Nonnull + private JobUsageRecord newUR() throws DatatypeConfigurationException { + try { + if (wd != null) + return new JobUsageRecord(wd.getName()); + } catch (RuntimeException e) { + } + return new JobUsageRecord("unknown"); + } + + /** + * Fills in the accounting information from the exit code and stderr. + * + * @param exitCode + * The exit code from the program. + */ + private void buildUR(@Nonnull Status status, int exitCode) { + try { + Date now = new Date(); + long user = -1, sys = -1, real = -1; + Matcher m = TimeRE.matcher(stderr.toString()); + ur = newUR(); + while (m.find()) + for (int i = 1; i < 6; i += 2) + if (m.group(i + 1).equals("user")) + user = parseDuration(m.group(i)); + else if (m.group(i + 1).equals("sys") + || m.group(i + 1).equals("system")) + sys = parseDuration(m.group(i)); + else if (m.group(i + 1).equals("real") + || m.group(i + 1).equals("elapsed")) + real = parseDuration(m.group(i)); + if (user != -1) + ur.addCpuDuration(user).setUsageType("user"); + if (sys != -1) + ur.addCpuDuration(sys).setUsageType("system"); + ur.addUser(System.getProperty("user.name"), null); + ur.addStartAndEnd(start, now); + if (real != -1) + ur.addWallDuration(real); + else + ur.addWallDuration(now.getTime() - start.getTime()); + ur.setStatus(status.toString()); + ur.addHost(getLocalHost().getHostName()); + ur.addResource("exitcode", Integer.toString(exitCode)); + ur.addDisk(sizeOfDirectory(wd)).setStorageUnit("B"); + if (urreceiver != null) + urreceiver.acceptUsageRecord(ur.marshal()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private long parseDuration(@Nonnull String durationString) { + try { + return (long) (parseDouble(durationString) * 1000); + } catch (NumberFormatException nfe) { + // Not a double; maybe MM:SS.mm or HH:MM:SS.mm + } + long dur = 0; + for (String d : durationString.split(":")) + try { + dur = 60 * dur + parseLong(d); + } catch (NumberFormatException nfe) { + // Assume that only one thing is fractional, and that it is last + return 60000 * dur + (long) (parseDouble(d) * 1000); + } + return dur * 1000; + } + + private void signal(@Nonnull String signal) throws Exception { + int pid = getPID(); + if (pid > 0 + && getRuntime().exec("kill -" + signal + " " + pid).waitFor() == 0) + return; + throw new Exception("failed to send signal " + signal + " to process " + + pid); + } + + @Nullable + private Integer killNicely() { + try { + signal("TERM"); + return subprocess.waitFor(); + } catch (Exception e) { + return null; + } + } + + @Nullable + private Integer killHard() { + try { + signal("QUIT"); + return subprocess.waitFor(); + } catch (Exception e) { + return null; + } + } + + /** + * Move the worker out of the stopped state and back to operating. + * + * @throws Exception + * if it fails. + */ + @Override + public void startWorker() throws Exception { + signal("CONT"); + stopped = false; + } + + /** + * Move the worker into the stopped state from the operating state. + * + * @throws Exception + * if it fails. + */ + @Override + public void stopWorker() throws Exception { + signal("STOP"); + stopped = true; + } + + /** + * @return The status of the workflow run. Note that this can be an + * expensive operation. + */ + @Override + public RemoteStatus getWorkerStatus() { + if (subprocess == null) + return Initialized; + if (finished) + return Finished; + try { + setExitCode(subprocess.exitValue()); + } catch (IllegalThreadStateException e) { + if (stopped) + return Stopped; + return Operating; + } + finished = true; + readyToSendEmail = true; + accounting.runCeased(); + buildUR(exitCode.intValue() == 0 ? Completed : Failed, exitCode); + return Finished; + } + + @Override + public String getConfiguration() { + return ""; + } + + @Override + public String getName() { + return DEFAULT_LISTENER_NAME; + } + + @Override + public String getProperty(String propName) throws RemoteException { + switch (Property.is(propName)) { + case STDOUT: + return stdout.toString(); + case STDERR: + return stderr.toString(); + case EXIT_CODE: + return (exitCode == null) ? "" : exitCode.toString(); + case EMAIL: + return emailAddress; + case READY_TO_NOTIFY: + return Boolean.toString(readyToSendEmail); + case USAGE: + try { + JobUsageRecord toReturn; + if (subprocess == null) { + toReturn = newUR(); + toReturn.setStatus(Held.toString()); + } else if (ur == null) { + toReturn = newUR(); + toReturn.setStatus(Started.toString()); + toReturn.addStartAndEnd(start, new Date()); + toReturn.addUser(System.getProperty("user.name"), null); + } else { + toReturn = ur; + } + /* + * Note that this record is not to be pushed to the server. That + * is done elsewhere (when a proper record is produced) + */ + return toReturn.marshal(); + } catch (Exception e) { + e.printStackTrace(); + return ""; + } + default: + throw new RemoteException("unknown property"); + } + } + + @Override + public String getType() { + return DEFAULT_LISTENER_NAME; + } + + @Override + public String[] listProperties() { + return Property.names(); + } + + @Override + public void setProperty(String propName, String value) + throws RemoteException { + switch (Property.is(propName)) { + case EMAIL: + emailAddress = value; + return; + case READY_TO_NOTIFY: + readyToSendEmail = parseBoolean(value); + return; + case STDOUT: + case STDERR: + case EXIT_CODE: + case USAGE: + throw new RemoteException("property is read only"); + default: + throw new RemoteException("unknown property"); + } + } + + @Override + public RemoteListener getDefaultListener() { + return this; + } + + @Override + public void setURReceiver(@Nonnull UsageRecordReceiver receiver) { + urreceiver = receiver; + } + + @Override + public void deleteLocalResources() throws ImplementationException { + try { + if (workflowFile != null && workflowFile.getParentFile().exists()) + forceDelete(workflowFile); + } catch (IOException e) { + throw new ImplementationException("problem deleting workflow file", + e); + } + } +} + +/** + * An engine for asynchronously copying from an {@link InputStream} to a + * {@link Writer}. + * + * @author Donal Fellows + */ +class AsyncCopy extends Thread { + @Nonnull + private BufferedReader from; + @Nonnull + private Writer to; + @Nullable + private Holder<Integer> pidHolder; + + AsyncCopy(@Nonnull InputStream from, @Nonnull Writer to) + throws UnsupportedEncodingException { + this(from, to, null); + } + + AsyncCopy(@Nonnull InputStream from, @Nonnull Writer to, + @Nullable Holder<Integer> pid) throws UnsupportedEncodingException { + this.from = new BufferedReader(new InputStreamReader(from, + SYSTEM_ENCODING)); + this.to = to; + this.pidHolder = pid; + setDaemon(true); + start(); + } + + @Override + public void run() { + try { + if (pidHolder != null) { + String line = from.readLine(); + if (line.matches("^pid:\\d+$")) + synchronized (pidHolder) { + pidHolder.value = parseInt(line.substring(4)); + } + else + to.write(line + System.getProperty("line.separator")); + } + copy(from, to); + } catch (IOException e) { + } + } +} + +/** + * A helper for asynchronously writing a password to a subprocess's stdin. + * + * @author Donal Fellows + */ +class PasswordWriterThread extends Thread { + private OutputStream to; + private char[] chars; + + PasswordWriterThread(@Nonnull Process to, @Nonnull char[] chars) { + this.to = to.getOutputStream(); + assert chars != null; + this.chars = chars; + setDaemon(true); + start(); + } + + @Override + public void run() { + try (PrintWriter pw = new PrintWriter(new OutputStreamWriter(to, + SYSTEM_ENCODING))) { + pw.println(chars); + } catch (UnsupportedEncodingException e) { + // Not much we can do here + e.printStackTrace(); + } finally { + /* + * We don't trust GC to clear password from memory. We also take + * care not to clear the default password! + */ + if (chars != KEYSTORE_PASSWORD) + Arrays.fill(chars, '\00'); + } + } +} + +enum Property { + STDOUT("stdout"), STDERR("stderr"), EXIT_CODE("exitcode"), READY_TO_NOTIFY( + "readyToNotify"), EMAIL("notificationAddress"), USAGE("usageRecord"); + + private String s; + + private Property(String s) { + this.s = s; + pmap.put(s, this); + } + + @Override + public String toString() { + return s; + } + + public static Property is(@Nonnull String s) { + return pmap.get(s); + } + + @Nonnull + public static String[] names() { + return pmap.keySet().toArray(new String[pmap.size()]); + } +} + +enum Status { + Aborted, Completed, Failed, Held, Queued, Started, Suspended +}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/utils/FilenameVerifier.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/utils/FilenameVerifier.java b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/utils/FilenameVerifier.java new file mode 100644 index 0000000..fa2e117 --- /dev/null +++ b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/utils/FilenameVerifier.java @@ -0,0 +1,169 @@ +/* + */ +package org.taverna.server.localworker.impl.utils; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +/** + * Utility class that handles filename validation on different target platforms. + * + * @author Donal Fellows. + */ +public abstract class FilenameVerifier { + private FilenameVerifier(){} + + static final boolean IS_WINDOWS = System.getProperty("os.name").toLowerCase().contains("win"); + + @SuppressWarnings("serial") + private static final Set<String> ILLEGAL_NAMES = new HashSet<String>(){{ + add(""); + add(".."); + add("."); + if (IS_WINDOWS) { + add("con"); + add("prn"); + add("nul"); + add("aux"); + for (int i = 1; i <= 9; i++) { + add("com" + i); + add("lpt" + i); + } + } + }}; + @SuppressWarnings("serial") + private static final Set<Character> ILLEGAL_CHARS = new HashSet<Character>(){{ + add('/'); + for (char i=0 ; i<32 ; i++) + add(i); + if (IS_WINDOWS) { + add('\\'); + add('>'); + add('<'); + add(':'); + add('"'); + add('|'); + add('?'); + add('*'); + } else { + add(' '); // whitespace; too much trouble from these + add('\t'); + add('\r'); + add('\n'); + } + }}; + @SuppressWarnings("serial") + private static final Set<String> ILLEGAL_PREFIXES = new HashSet<String>(){{ + if (IS_WINDOWS) { + add("con."); + add("prn."); + add("nul."); + add("aux."); + for (int i = 1; i <= 9; i++) { + add("com" + i + "."); + add("lpt" + i + "."); + } + } + }}; + @SuppressWarnings("serial") + private static final Set<String> ILLEGAL_SUFFIXES = new HashSet<String>(){{ + if (IS_WINDOWS) { + add(" "); + add("."); + } + }}; + + /** + * Construct a file handle, applying platform-specific filename validation + * rules in the process. + * + * @param dir + * The directory acting as a root, which is assumed to be + * correctly named. May be <tt>null</tt>. + * @param names + * The names of filename fragments to apply the checks to. Must + * have at least one value. + * @return The file handle. Never <tt>null</tt>. + * @throws IOException + * If validation fails. + */ + public static File getValidatedFile(File dir, String... names) + throws IOException { + if (names.length == 0) + throw new IOException("empty filename"); + File f = dir; + for (String name : names) { + String low = name.toLowerCase(); + if (ILLEGAL_NAMES.contains(low)) + throw new IOException("illegal filename"); + for (char c : ILLEGAL_CHARS) + if (low.indexOf(c) >= 0) + throw new IOException("illegal filename"); + for (String s : ILLEGAL_PREFIXES) + if (low.startsWith(s)) + throw new IOException("illegal filename"); + for (String s : ILLEGAL_SUFFIXES) + if (low.endsWith(s)) + throw new IOException("illegal filename"); + f = new File(f, name); + } + assert f != null; + return f; + } + + /** + * Create a file handle where the underlying file must exist. + * + * @param dir + * The directory that will contain the file. + * @param name + * The name of the file; will be validated. + * @return The handle. + * @throws IOException + * If validation fails or the file doesn't exist. + */ + public static File getValidatedExistingFile(File dir, String name) + throws IOException { + File f = getValidatedFile(dir, name); + if (!f.exists()) + throw new IOException("doesn't exist"); + return f; + } + + /** + * Create a file handle where the underlying file must <i>not</i> exist. + * + * @param dir + * The directory that will contain the file. + * @param name + * The name of the file; will be validated. + * @return The handle. The file will not be created by this method. + * @throws IOException + * If validation fails or the file does exist. + */ + public static File getValidatedNewFile(File dir, String name) + throws IOException { + File f = getValidatedFile(dir, name); + if (f.exists()) + throw new IOException("already exists"); + return f; + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/utils/TimingOutTask.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/utils/TimingOutTask.java b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/utils/TimingOutTask.java new file mode 100644 index 0000000..c5b1b7b --- /dev/null +++ b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/utils/TimingOutTask.java @@ -0,0 +1,56 @@ +package org.taverna.server.localworker.impl.utils; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import javax.annotation.Nullable; + +/** + * A class that handles running a task that can take some time. + * + * @author Donal Fellows + * + */ +public abstract class TimingOutTask extends Thread { + public abstract void doIt() throws Exception; + + @Nullable + private Exception ioe; + + @Override + public final void run() { + try { + doIt(); + } catch (Exception ioe) { + this.ioe = ioe; + } + } + + public TimingOutTask() { + this.setDaemon(true); + } + + public void doOrTimeOut(long timeout) throws Exception { + start(); + try { + join(timeout); + } catch (InterruptedException e) { + interrupt(); + } + if (ioe != null) + throw ioe; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-worker/src/main/java/org/taverna/server/localworker/api/Constants.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/taverna/server/localworker/api/Constants.java b/taverna-server-worker/src/main/java/org/taverna/server/localworker/api/Constants.java deleted file mode 100644 index 4ee24ad..0000000 --- a/taverna-server-worker/src/main/java/org/taverna/server/localworker/api/Constants.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - */ -package org.taverna.server.localworker.api; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import static java.nio.charset.Charset.defaultCharset; - -import java.net.InetAddress; -import java.net.UnknownHostException; - -/** - * The defaults associated with this worker, together with various other - * constants. - * - * @author Donal Fellows - */ -public abstract class Constants { - /** - * Subdirectories of the working directory to create by default. - */ - public static final String[] SUBDIR_LIST = { "conf", "externaltool", "feed", - "interactions", "lib", "logs", "plugins", "repository", "var" }; - - /** The name of the default encoding for characters on this machine. */ - public static final String SYSTEM_ENCODING = defaultCharset().name(); - - /** - * Password to use to encrypt security information. This default is <7 chars - * to work even without Unlimited Strength JCE. - */ - public static final char[] KEYSTORE_PASSWORD = { 'c', 'h', 'a', 'n', 'g', 'e' }; - - /** - * The name of the directory (in the home directory) where security settings - * will be written. - */ - public static final String SECURITY_DIR_NAME = ".taverna-server-security"; - - /** The name of the file that will be the created keystore. */ - public static final String KEYSTORE_FILE = "t2keystore.ubr"; - - /** The name of the file that will be the created truststore. */ - public static final String TRUSTSTORE_FILE = "t2truststore.ubr"; - - /** - * The name of the file that contains the password to unlock the keystore - * and truststore. - */ - public static final String PASSWORD_FILE = "password.txt"; - - // --------- UNUSED --------- - // /** - // * The name of the file that contains the mapping from URIs to keystore - // * aliases. - // */ - // public static final String URI_ALIAS_MAP = "urlmap.txt"; - - /** - * Used to instruct the Taverna credential manager to use a non-default - * location for user credentials. - */ - public static final String CREDENTIAL_MANAGER_DIRECTORY = "-cmdir"; - - /** - * Used to instruct the Taverna credential manager to take its master - * password from standard input. - */ - public static final String CREDENTIAL_MANAGER_PASSWORD = "-cmpassword"; - - /** - * Name of environment variable used to pass HELIO security tokens to - * workflows. - */ - // This technique is known to be insecure; bite me. - public static final String HELIO_TOKEN_NAME = "HELIO_CIS_TOKEN"; - - /** - * The name of the standard listener, which is installed by default. - */ - public static final String DEFAULT_LISTENER_NAME = "io"; - - /** - * Time to wait for the subprocess to wait, in milliseconds. - */ - public static final int START_WAIT_TIME = 1500; - - /** - * Time to wait for success or failure of a death-causing activity (i.e., - * sending a signal). - */ - public static final int DEATH_TIME = 333; - - /** - * The name of the file (in this code's resources) that provides the default - * security policy that we use. - */ - public static final String SECURITY_POLICY_FILE = "security.policy"; - - /** - * The Java property holding security policy info. - */ - public static final String SEC_POLICY_PROP = "java.security.policy"; - /** - * The Java property to set to make this code not try to enforce security - * policy. - */ - public static final String UNSECURE_PROP = "taverna.suppressrestrictions.rmi"; - /** - * The Java property that holds the name of the host name to enforce. - */ - public static final String RMI_HOST_PROP = "java.rmi.server.hostname"; - /** - * The default hostname to require in secure mode. This is the - * <i>resolved</i> version of "localhost". - */ - public static final String LOCALHOST; - static { - String h = "127.0.0.1"; // fallback - try { - h = InetAddress.getByName("localhost").getHostAddress(); - } catch (UnknownHostException e) { - e.printStackTrace(); - } finally { - LOCALHOST = h; - } - } - - /** - * Time to wait during closing down this process. In milliseconds. - */ - public static final int DEATH_DELAY = 500; - /** - * The name of the property describing where shared directories should be - * located. - */ - public static final String SHARED_DIR_PROP = "taverna.sharedDirectory"; - - public static final String TIME = "/usr/bin/time"; -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-worker/src/main/java/org/taverna/server/localworker/api/RunAccounting.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/taverna/server/localworker/api/RunAccounting.java b/taverna-server-worker/src/main/java/org/taverna/server/localworker/api/RunAccounting.java deleted file mode 100644 index dd18db0..0000000 --- a/taverna-server-worker/src/main/java/org/taverna/server/localworker/api/RunAccounting.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - */ -package org.taverna.server.localworker.api; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * - * @author Donal Fellows - */ -public interface RunAccounting { - /** - * Logs that a run has started executing. - */ - void runStarted(); - - /** - * Logs that a run has finished executing. - */ - void runCeased(); -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-worker/src/main/java/org/taverna/server/localworker/api/Worker.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/taverna/server/localworker/api/Worker.java b/taverna-server-worker/src/main/java/org/taverna/server/localworker/api/Worker.java deleted file mode 100644 index 52c7009..0000000 --- a/taverna-server-worker/src/main/java/org/taverna/server/localworker/api/Worker.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - */ -package org.taverna.server.localworker.api; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.File; -import java.util.List; -import java.util.Map; - -import org.taverna.server.localworker.impl.LocalWorker; -import org.taverna.server.localworker.remote.ImplementationException; -import org.taverna.server.localworker.remote.RemoteListener; -import org.taverna.server.localworker.remote.RemoteStatus; -import org.taverna.server.localworker.server.UsageRecordReceiver; - -/** - * The interface between the connectivity layer and the thunk to the - * subprocesses. - * - * @author Donal Fellows - */ -public interface Worker { - /** - * Fire up the workflow. This causes a transition into the operating state. - * - * @param local - * The reference to the factory class for this worker. - * @param executeWorkflowCommand - * The command to run to execute the workflow. - * @param workflow - * The workflow document to execute. - * @param workingDir - * What directory to use as the working directory. - * @param inputBaclavaFile - * The baclava file to use for inputs, or <tt>null</tt> to use - * the other <b>input*</b> arguments' values. - * @param inputRealFiles - * A mapping of input names to files that supply them. Note that - * we assume that nothing mapped here will be mapped in - * <b>inputValues</b>. - * @param inputValues - * A mapping of input names to values to supply to them. Note - * that we assume that nothing mapped here will be mapped in - * <b>inputFiles</b>. - * @param inputDelimiters - * A mapping of input names to characters used to split them into - * lists. - * @param outputBaclavaFile - * What baclava file to write the output from the workflow into, - * or <tt>null</tt> to have it written into the <tt>out</tt> - * subdirectory. - * @param contextDirectory - * The directory containing the keystore and truststore. <i>Must - * not be <tt>null</tt>.</i> - * @param keystorePassword - * The password to the keystore and truststore. <i>Must not be - * <tt>null</tt>.</i> - * @param generateProvenance - * Whether to generate a run bundle containing provenance data. - * @param environment - * Any environment variables that need to be added to the - * invokation. - * @param masterToken - * The internal name of the workflow run. - * @param runtimeSettings - * List of configuration details for the forked runtime. - * @return Whether a successful start happened. - * @throws Exception - * If any of quite a large number of things goes wrong. - */ - boolean initWorker(LocalWorker local, String executeWorkflowCommand, - byte[] workflow, File workingDir, File inputBaclavaFile, - Map<String, File> inputRealFiles, Map<String, String> inputValues, - Map<String, String> inputDelimiters, File outputBaclavaFile, - File contextDirectory, char[] keystorePassword, - boolean generateProvenance, Map<String, String> environment, - String masterToken, List<String> runtimeSettings) throws Exception; - - /** - * Kills off the subprocess if it exists and is alive. - * - * @throws Exception - * if anything goes badly wrong when the worker is being killed - * off. - */ - void killWorker() throws Exception; - - /** - * Move the worker out of the stopped state and back to operating. - * - * @throws Exception - * if it fails (which it always does; operation currently - * unsupported). - */ - void startWorker() throws Exception; - - /** - * Move the worker into the stopped state from the operating state. - * - * @throws Exception - * if it fails (which it always does; operation currently - * unsupported). - */ - void stopWorker() throws Exception; - - /** - * @return The status of the workflow run. Note that this can be an - * expensive operation. - */ - RemoteStatus getWorkerStatus(); - - /** - * @return The listener that is registered by default, in addition to all - * those that are explicitly registered by the user. - */ - RemoteListener getDefaultListener(); - - /** - * @param receiver - * The destination where any final usage records are to be - * written in order to log them back to the server. - */ - void setURReceiver(UsageRecordReceiver receiver); - - /** - * Arrange for the deletion of any resources created during worker process - * construction. Guaranteed to be the last thing done before finalization. - * - * @throws ImplementationException - * If anything goes wrong. - */ - void deleteLocalResources() throws ImplementationException; -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-worker/src/main/java/org/taverna/server/localworker/api/WorkerFactory.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/taverna/server/localworker/api/WorkerFactory.java b/taverna-server-worker/src/main/java/org/taverna/server/localworker/api/WorkerFactory.java deleted file mode 100644 index 0fd2d20..0000000 --- a/taverna-server-worker/src/main/java/org/taverna/server/localworker/api/WorkerFactory.java +++ /dev/null @@ -1,34 +0,0 @@ -package org.taverna.server.localworker.api; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -/** - * Class that manufactures instances of {@link Worker}. - * - * @author Donal Fellows - */ -public interface WorkerFactory { - /** - * Create an instance of the low-level worker class. - * - * @return The worker object. - * @throws Exception - * If anything goes wrong. - */ - Worker makeInstance() throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/DirectoryDelegate.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/DirectoryDelegate.java b/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/DirectoryDelegate.java deleted file mode 100644 index 6b7ba77..0000000 --- a/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/DirectoryDelegate.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - */ -package org.taverna.server.localworker.impl; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import static org.apache.commons.io.FileUtils.forceDelete; -import static org.apache.commons.io.FileUtils.forceMkdir; -import static org.apache.commons.io.FileUtils.touch; -import static org.taverna.server.localworker.impl.utils.FilenameVerifier.getValidatedNewFile; - -import java.io.File; -import java.io.IOException; -import java.rmi.RemoteException; -import java.rmi.server.UnicastRemoteObject; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.List; - -import javax.annotation.Nonnull; - -import org.apache.commons.collections.MapIterator; -import org.apache.commons.collections.map.ReferenceMap; -import org.taverna.server.localworker.remote.RemoteDirectory; -import org.taverna.server.localworker.remote.RemoteDirectoryEntry; -import org.taverna.server.localworker.remote.RemoteFile; - -/** - * This class acts as a remote-aware delegate for the workflow run's working - * directory and its subdirectories. - * - * @author Donal Fellows - * @see FileDelegate - */ -@SuppressWarnings("serial") -public class DirectoryDelegate extends UnicastRemoteObject implements - RemoteDirectory { - private File dir; - private DirectoryDelegate parent; - private ReferenceMap localCache; - - /** - * @param dir - * @param parent - * @throws RemoteException - * If registration of the directory fails. - */ - public DirectoryDelegate(@Nonnull File dir, - @Nonnull DirectoryDelegate parent) throws RemoteException { - super(); - this.localCache = new ReferenceMap(); - this.dir = dir; - this.parent = parent; - } - - @Override - public Collection<RemoteDirectoryEntry> getContents() - throws RemoteException { - List<RemoteDirectoryEntry> result = new ArrayList<>(); - for (String s : dir.list()) { - if (s.equals(".") || s.equals("..")) - continue; - File f = new File(dir, s); - RemoteDirectoryEntry entry; - synchronized (localCache) { - entry = (RemoteDirectoryEntry) localCache.get(s); - if (f.isDirectory()) { - if (entry == null || !(entry instanceof DirectoryDelegate)) { - entry = new DirectoryDelegate(f, this); - localCache.put(s, entry); - } - } else if (f.isFile()) { - if (entry == null || !(entry instanceof FileDelegate)) { - entry = new FileDelegate(f, this); - localCache.put(s, entry); - } - } else { - // not file or dir; skip... - continue; - } - } - result.add(entry); - } - return result; - } - - @Override - public RemoteFile makeEmptyFile(String name) throws IOException { - File f = getValidatedNewFile(dir, name); - touch(f); - FileDelegate delegate = new FileDelegate(f, this); - synchronized (localCache) { - localCache.put(name, delegate); - } - return delegate; - } - - @Override - public RemoteDirectory makeSubdirectory(String name) throws IOException { - File f = getValidatedNewFile(dir, name); - forceMkdir(f); - DirectoryDelegate delegate = new DirectoryDelegate(f, this); - synchronized (localCache) { - localCache.put(name, delegate); - } - return delegate; - } - - @SuppressWarnings("unchecked") - @Override - public void destroy() throws IOException { - if (parent == null) - throw new IOException("tried to destroy main job working directory"); - Collection<RemoteDirectoryEntry> values; - synchronized (localCache) { - values = new ArrayList<>(localCache.values()); - } - for (RemoteDirectoryEntry obj : values) { - if (obj == null) - continue; - try { - obj.destroy(); - } catch (IOException e) { - } - } - forceDelete(dir); - parent.forgetEntry(this); - } - - @Override - public RemoteDirectory getContainingDirectory() { - return parent; - } - - void forgetEntry(@Nonnull RemoteDirectoryEntry entry) { - synchronized (localCache) { - MapIterator i = localCache.mapIterator(); - while (i.hasNext()) { - Object key = i.next(); - if (entry == i.getValue()) { - localCache.remove(key); - break; - } - } - } - } - - @Override - public String getName() { - if (parent == null) - return ""; - return dir.getName(); - } - - @Override - public Date getModificationDate() throws RemoteException { - return new Date(dir.lastModified()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/FileDelegate.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/FileDelegate.java b/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/FileDelegate.java deleted file mode 100644 index 8dd9ede..0000000 --- a/taverna-server-worker/src/main/java/org/taverna/server/localworker/impl/FileDelegate.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - */ -package org.taverna.server.localworker.impl; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import static java.lang.System.arraycopy; -import static java.net.InetAddress.getLocalHost; -import static org.apache.commons.io.FileUtils.copyFile; -import static org.apache.commons.io.FileUtils.forceDelete; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.net.UnknownHostException; -import java.rmi.RemoteException; -import java.rmi.server.UnicastRemoteObject; -import java.util.Date; - -import javax.annotation.Nonnull; - -import org.taverna.server.localworker.remote.RemoteDirectory; -import org.taverna.server.localworker.remote.RemoteFile; - -/** - * This class acts as a remote-aware delegate for the files in a workflow run's - * working directory and its subdirectories. - * - * @author Donal Fellows - * @see DirectoryDelegate - */ [email protected]("serial") -public class FileDelegate extends UnicastRemoteObject implements RemoteFile { - private File file; - private DirectoryDelegate parent; - - /** - * @param file - * @param parent - * @throws RemoteException - * If registration of the file fails. - */ - public FileDelegate(@Nonnull File file, @Nonnull DirectoryDelegate parent) - throws RemoteException { - super(); - this.file = file; - this.parent = parent; - } - - @Override - public byte[] getContents(int offset, int length) throws IOException { - if (length == -1) - length = (int) (file.length() - offset); - if (length < 0 || length > 1024 * 64) - length = 1024 * 64; - byte[] buffer = new byte[length]; - int read; - try (FileInputStream fis = new FileInputStream(file)) { - if (offset > 0 && fis.skip(offset) != offset) - throw new IOException("did not move to correct offset in file"); - read = fis.read(buffer); - } - if (read <= 0) - return new byte[0]; - if (read < buffer.length) { - byte[] shortened = new byte[read]; - arraycopy(buffer, 0, shortened, 0, read); - return shortened; - } - return buffer; - } - - @Override - public long getSize() { - return file.length(); - } - - @Override - public void setContents(byte[] data) throws IOException { - try (FileOutputStream fos = new FileOutputStream(file)) { - fos.write(data); - } - } - - @Override - public void appendContents(byte[] data) throws IOException { - try (FileOutputStream fos = new FileOutputStream(file, true)) { - fos.write(data); - } - } - - @Override - public void destroy() throws IOException { - forceDelete(file); - parent.forgetEntry(this); - parent = null; - } - - @Override - public RemoteDirectory getContainingDirectory() { - return parent; - } - - @Override - public String getName() { - return file.getName(); - } - - @Override - public void copy(RemoteFile sourceFile) throws RemoteException, IOException { - String sourceHost = sourceFile.getNativeHost(); - if (!getNativeHost().equals(sourceHost)) { - throw new IOException( - "cross-system copy not implemented; cannot copy from " - + sourceHost + " to " + getNativeHost()); - } - // Must copy; cannot count on other file to stay unmodified - copyFile(new File(sourceFile.getNativeName()), file); - } - - @Override - public String getNativeName() { - return file.getAbsolutePath(); - } - - @Override - public String getNativeHost() { - try { - return getLocalHost().getHostAddress(); - } catch (UnknownHostException e) { - throw new RuntimeException( - "unexpected failure to resolve local host address", e); - } - } - - @Override - public Date getModificationDate() throws RemoteException { - return new Date(file.lastModified()); - } -}
