http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/test/java/org/taverna/server/master/mocks/MockPolicy.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/test/java/org/taverna/server/master/mocks/MockPolicy.java b/taverna-server-webapp/src/test/java/org/taverna/server/master/mocks/MockPolicy.java deleted file mode 100644 index b61fc10..0000000 --- a/taverna-server-webapp/src/test/java/org/taverna/server/master/mocks/MockPolicy.java +++ /dev/null @@ -1,75 +0,0 @@ -package org.taverna.server.master.mocks; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.util.HashSet; -import java.util.Set; - -import org.taverna.server.master.common.Workflow; -import org.taverna.server.master.exceptions.NoCreateException; -import org.taverna.server.master.exceptions.NoDestroyException; -import org.taverna.server.master.exceptions.NoUpdateException; -import org.taverna.server.master.interfaces.TavernaRun; -import org.taverna.server.master.utils.UsernamePrincipal; - -public class MockPolicy extends SimpleServerPolicy { - public MockPolicy() { - super(); - super.setCleanerInterval(30); - } - - public int maxruns = 10; - Integer usermaxruns; - Set<TavernaRun> denyaccess = new HashSet<>(); - boolean exnOnUpdate, exnOnCreate, exnOnDelete; - - @Override - public int getMaxRuns() { - return maxruns; - } - - @Override - public Integer getMaxRuns(UsernamePrincipal user) { - return usermaxruns; - } - - @Override - public boolean permitAccess(UsernamePrincipal user, TavernaRun run) { - return !denyaccess.contains(run); - } - - @Override - public void permitCreate(UsernamePrincipal user, Workflow workflow) - throws NoCreateException { - if (this.exnOnCreate) - throw new NoCreateException(); - } - - @Override - public void permitDestroy(UsernamePrincipal user, TavernaRun run) - throws NoDestroyException { - if (this.exnOnDelete) - throw new NoDestroyException(); - } - - @Override - public void permitUpdate(UsernamePrincipal user, TavernaRun run) - throws NoUpdateException { - if (this.exnOnUpdate) - throw new NoUpdateException(); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/test/java/org/taverna/server/master/mocks/SimpleListenerFactory.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/test/java/org/taverna/server/master/mocks/SimpleListenerFactory.java b/taverna-server-webapp/src/test/java/org/taverna/server/master/mocks/SimpleListenerFactory.java deleted file mode 100644 index 7d9c998..0000000 --- a/taverna-server-webapp/src/test/java/org/taverna/server/master/mocks/SimpleListenerFactory.java +++ /dev/null @@ -1,80 +0,0 @@ -package org.taverna.server.master.mocks; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.taverna.server.master.exceptions.NoListenerException; -import org.taverna.server.master.factories.ListenerFactory; -import org.taverna.server.master.interfaces.Listener; -import org.taverna.server.master.interfaces.TavernaRun; - -/** - * A factory for event listener. The factory is configured using Spring. - * - * @author Donal Fellows - */ -public class SimpleListenerFactory implements ListenerFactory { - private Map<String, Builder> builders = new HashMap<>(); - - public void setBuilders(Map<String, Builder> builders) { - this.builders = builders; - } - - @Override - public List<String> getSupportedListenerTypes() { - return new ArrayList<>(builders.keySet()); - } - - @Override - public Listener makeListener(TavernaRun run, String listenerType, - String configuration) throws NoListenerException { - Builder b = builders.get(listenerType); - if (b == null) - throw new NoListenerException("no such listener type"); - Listener l = b.build(run, configuration); - run.addListener(l); - return l; - } - - /** - * How to actually construct a listener. - * - * @author Donal Fellows - */ - public interface Builder { - /** - * Make an event listener attached to a run. - * - * @param run - * The run to attach to. - * @param configuration - * A user-specified configuration document. The constructed - * listener <i>should</i> process this configuration document - * and be able to return it to the user when requested. - * @return The listener object. - * @throws NoListenerException - * If the listener construction failed or the - * <b>configuration</b> document was bad in some way. - */ - public Listener build(TavernaRun run, String configuration) - throws NoListenerException; - } -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/test/java/org/taverna/server/master/mocks/SimpleNonpersistentRunStore.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/test/java/org/taverna/server/master/mocks/SimpleNonpersistentRunStore.java b/taverna-server-webapp/src/test/java/org/taverna/server/master/mocks/SimpleNonpersistentRunStore.java deleted file mode 100644 index 63b6754..0000000 --- a/taverna-server-webapp/src/test/java/org/taverna/server/master/mocks/SimpleNonpersistentRunStore.java +++ /dev/null @@ -1,167 +0,0 @@ -package org.taverna.server.master.mocks; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.lang.ref.WeakReference; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; - -import org.taverna.server.master.exceptions.NoDestroyException; -import org.taverna.server.master.exceptions.UnknownRunException; -import org.taverna.server.master.interfaces.Policy; -import org.taverna.server.master.interfaces.RunStore; -import org.taverna.server.master.interfaces.TavernaRun; -import org.taverna.server.master.utils.UsernamePrincipal; - -/** - * Example of a store for Taverna Workflow Runs. - * - * @author Donal Fellows - */ -public class SimpleNonpersistentRunStore implements RunStore { - private Map<String, TavernaRun> store = new HashMap<>(); - private Object lock = new Object(); - - Timer timer; - private CleanerTask cleaner; - - /** - * The connection to the main policy store. Suitable for wiring up with - * Spring. - * - * @param p - * The policy to connect to. - */ - public void setPolicy(SimpleServerPolicy p) { - p.store = this; - cleanerIntervalUpdated(p.getCleanerInterval()); - } - - public SimpleNonpersistentRunStore() { - timer = new Timer("SimpleNonpersistentRunStore.CleanerTimer", true); - cleanerIntervalUpdated(300); - } - - @Override - protected void finalize() { - timer.cancel(); - } - - /** - * Remove and destroy all runs that are expired at the moment that this - * method starts. - */ - void clean() { - Date now = new Date(); - synchronized (lock) { - // Use an iterator so we have access to its remove() method... - Iterator<TavernaRun> i = store.values().iterator(); - while (i.hasNext()) { - TavernaRun w = i.next(); - if (w.getExpiry().before(now)) { - i.remove(); - try { - w.destroy(); - } catch (NoDestroyException e) { - } - } - } - } - } - - /** - * Reconfigure the cleaner task's call interval. This is called internally - * and from the Policy when the interval is set there. - * - * @param intervalInSeconds - * How long between runs of the cleaner task, in seconds. - */ - void cleanerIntervalUpdated(int intervalInSeconds) { - if (cleaner != null) - cleaner.cancel(); - cleaner = new CleanerTask(this, intervalInSeconds); - } - - @Override - public TavernaRun getRun(UsernamePrincipal user, Policy p, String uuid) - throws UnknownRunException { - synchronized (lock) { - TavernaRun w = store.get(uuid); - if (w == null || !p.permitAccess(user, w)) - throw new UnknownRunException(); - return w; - } - } - - @Override - public TavernaRun getRun(String uuid) throws UnknownRunException { - synchronized (lock) { - TavernaRun w = store.get(uuid); - if (w == null) - throw new UnknownRunException(); - return w; - } - } - - @Override - public Map<String, TavernaRun> listRuns(UsernamePrincipal user, Policy p) { - Map<String, TavernaRun> filtered = new HashMap<>(); - synchronized (lock) { - for (Map.Entry<String, TavernaRun> entry : store.entrySet()) - if (p.permitAccess(user, entry.getValue())) - filtered.put(entry.getKey(), entry.getValue()); - } - return filtered; - } - - @Override - public String registerRun(TavernaRun run) { - synchronized (lock) { - store.put(run.getId(), run); - return run.getId(); - } - } - - @Override - public void unregisterRun(String uuid) { - synchronized (lock) { - store.remove(uuid); - } - } -} - -class CleanerTask extends TimerTask { - WeakReference<SimpleNonpersistentRunStore> store; - - CleanerTask(SimpleNonpersistentRunStore store, int interval) { - this.store = new WeakReference<>(store); - int tms = interval * 1000; - store.timer.scheduleAtFixedRate(this, tms, tms); - } - - @Override - public void run() { - SimpleNonpersistentRunStore s = store.get(); - if (s != null) { - s.clean(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/test/java/org/taverna/server/master/mocks/SimpleServerPolicy.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/test/java/org/taverna/server/master/mocks/SimpleServerPolicy.java b/taverna-server-webapp/src/test/java/org/taverna/server/master/mocks/SimpleServerPolicy.java deleted file mode 100644 index 8dd2757..0000000 --- a/taverna-server-webapp/src/test/java/org/taverna/server/master/mocks/SimpleServerPolicy.java +++ /dev/null @@ -1,126 +0,0 @@ -package org.taverna.server.master.mocks; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.net.URI; -import java.util.List; - -import org.taverna.server.master.common.Workflow; -import org.taverna.server.master.exceptions.NoCreateException; -import org.taverna.server.master.exceptions.NoDestroyException; -import org.taverna.server.master.exceptions.NoUpdateException; -import org.taverna.server.master.interfaces.Policy; -import org.taverna.server.master.interfaces.TavernaRun; -import org.taverna.server.master.utils.UsernamePrincipal; - -/** - * A very simple (and unsafe) security model. The number of runs is configurable - * through Spring (or 10 if unconfigured) with no per-user limits supported, all - * workflows are permitted, and all identified users may create a workflow run. - * Any user may read off information about any run, but only its owner may - * modify or destroy it. - * <p> - * Note that this is a <i>Policy Enforcement Point</i> for access control to - * individual workflows. - * - * @author Donal Fellows - */ -public class SimpleServerPolicy implements Policy { - private int maxRuns = 10; - private int cleanerInterval; - SimpleNonpersistentRunStore store; - - public void setMaxRuns(int maxRuns) { - this.maxRuns = maxRuns; - } - - @Override - public int getMaxRuns() { - return maxRuns; - } - - @Override - public Integer getMaxRuns(UsernamePrincipal p) { - return null; // No per-user limits - } - - public int getCleanerInterval() { - return cleanerInterval; - } - - /** - * Sets how often the store of workflow runs will try to clean out expired - * runs. - * - * @param intervalInSeconds - */ - public void setCleanerInterval(int intervalInSeconds) { - cleanerInterval = intervalInSeconds; - if (store != null) - store.cleanerIntervalUpdated(intervalInSeconds); - } - - @Override - public boolean permitAccess(UsernamePrincipal p, TavernaRun run) { - // No secrets here! - return true; - } - - @Override - public void permitCreate(UsernamePrincipal p, Workflow workflow) - throws NoCreateException { - // Only identified users may create - if (p == null) - throw new NoCreateException(); - // Global run count limit enforcement - if (store.listRuns(p, this).size() >= maxRuns) - throw new NoCreateException(); - // Per-user run count enforcement would come here - } - - @Override - public void permitDestroy(UsernamePrincipal p, TavernaRun run) - throws NoDestroyException { - // Only the creator may destroy - if (p == null || !p.equals(run.getSecurityContext().getOwner())) - throw new NoDestroyException(); - } - - @Override - public void permitUpdate(UsernamePrincipal p, TavernaRun run) - throws NoUpdateException { - // Only the creator may change - if (p == null || !p.equals(run.getSecurityContext().getOwner())) - throw new NoUpdateException(); - } - - @Override - public int getOperatingLimit() { - return 1; - } - - @Override - public List<URI> listPermittedWorkflowURIs(UsernamePrincipal user) { - return null; - } - - @Override - public void setPermittedWorkflowURIs(UsernamePrincipal user, - List<URI> permitted) { - // Ignore - } -} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/api/Constants.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/api/Constants.java b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/api/Constants.java new file mode 100644 index 0000000..4ee24ad --- /dev/null +++ b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/api/Constants.java @@ -0,0 +1,154 @@ +/* + */ +package org.taverna.server.localworker.api; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static java.nio.charset.Charset.defaultCharset; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +/** + * The defaults associated with this worker, together with various other + * constants. + * + * @author Donal Fellows + */ +public abstract class Constants { + /** + * Subdirectories of the working directory to create by default. + */ + public static final String[] SUBDIR_LIST = { "conf", "externaltool", "feed", + "interactions", "lib", "logs", "plugins", "repository", "var" }; + + /** The name of the default encoding for characters on this machine. */ + public static final String SYSTEM_ENCODING = defaultCharset().name(); + + /** + * Password to use to encrypt security information. This default is <7 chars + * to work even without Unlimited Strength JCE. + */ + public static final char[] KEYSTORE_PASSWORD = { 'c', 'h', 'a', 'n', 'g', 'e' }; + + /** + * The name of the directory (in the home directory) where security settings + * will be written. + */ + public static final String SECURITY_DIR_NAME = ".taverna-server-security"; + + /** The name of the file that will be the created keystore. */ + public static final String KEYSTORE_FILE = "t2keystore.ubr"; + + /** The name of the file that will be the created truststore. */ + public static final String TRUSTSTORE_FILE = "t2truststore.ubr"; + + /** + * The name of the file that contains the password to unlock the keystore + * and truststore. + */ + public static final String PASSWORD_FILE = "password.txt"; + + // --------- UNUSED --------- + // /** + // * The name of the file that contains the mapping from URIs to keystore + // * aliases. + // */ + // public static final String URI_ALIAS_MAP = "urlmap.txt"; + + /** + * Used to instruct the Taverna credential manager to use a non-default + * location for user credentials. + */ + public static final String CREDENTIAL_MANAGER_DIRECTORY = "-cmdir"; + + /** + * Used to instruct the Taverna credential manager to take its master + * password from standard input. + */ + public static final String CREDENTIAL_MANAGER_PASSWORD = "-cmpassword"; + + /** + * Name of environment variable used to pass HELIO security tokens to + * workflows. + */ + // This technique is known to be insecure; bite me. + public static final String HELIO_TOKEN_NAME = "HELIO_CIS_TOKEN"; + + /** + * The name of the standard listener, which is installed by default. + */ + public static final String DEFAULT_LISTENER_NAME = "io"; + + /** + * Time to wait for the subprocess to wait, in milliseconds. + */ + public static final int START_WAIT_TIME = 1500; + + /** + * Time to wait for success or failure of a death-causing activity (i.e., + * sending a signal). + */ + public static final int DEATH_TIME = 333; + + /** + * The name of the file (in this code's resources) that provides the default + * security policy that we use. + */ + public static final String SECURITY_POLICY_FILE = "security.policy"; + + /** + * The Java property holding security policy info. + */ + public static final String SEC_POLICY_PROP = "java.security.policy"; + /** + * The Java property to set to make this code not try to enforce security + * policy. + */ + public static final String UNSECURE_PROP = "taverna.suppressrestrictions.rmi"; + /** + * The Java property that holds the name of the host name to enforce. + */ + public static final String RMI_HOST_PROP = "java.rmi.server.hostname"; + /** + * The default hostname to require in secure mode. This is the + * <i>resolved</i> version of "localhost". + */ + public static final String LOCALHOST; + static { + String h = "127.0.0.1"; // fallback + try { + h = InetAddress.getByName("localhost").getHostAddress(); + } catch (UnknownHostException e) { + e.printStackTrace(); + } finally { + LOCALHOST = h; + } + } + + /** + * Time to wait during closing down this process. In milliseconds. + */ + public static final int DEATH_DELAY = 500; + /** + * The name of the property describing where shared directories should be + * located. + */ + public static final String SHARED_DIR_PROP = "taverna.sharedDirectory"; + + public static final String TIME = "/usr/bin/time"; +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/api/RunAccounting.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/api/RunAccounting.java b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/api/RunAccounting.java new file mode 100644 index 0000000..dd18db0 --- /dev/null +++ b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/api/RunAccounting.java @@ -0,0 +1,35 @@ +/* + */ +package org.taverna.server.localworker.api; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + * @author Donal Fellows + */ +public interface RunAccounting { + /** + * Logs that a run has started executing. + */ + void runStarted(); + + /** + * Logs that a run has finished executing. + */ + void runCeased(); +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/api/Worker.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/api/Worker.java b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/api/Worker.java new file mode 100644 index 0000000..52c7009 --- /dev/null +++ b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/api/Worker.java @@ -0,0 +1,148 @@ +/* + */ +package org.taverna.server.localworker.api; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.util.List; +import java.util.Map; + +import org.taverna.server.localworker.impl.LocalWorker; +import org.taverna.server.localworker.remote.ImplementationException; +import org.taverna.server.localworker.remote.RemoteListener; +import org.taverna.server.localworker.remote.RemoteStatus; +import org.taverna.server.localworker.server.UsageRecordReceiver; + +/** + * The interface between the connectivity layer and the thunk to the + * subprocesses. + * + * @author Donal Fellows + */ +public interface Worker { + /** + * Fire up the workflow. This causes a transition into the operating state. + * + * @param local + * The reference to the factory class for this worker. + * @param executeWorkflowCommand + * The command to run to execute the workflow. + * @param workflow + * The workflow document to execute. + * @param workingDir + * What directory to use as the working directory. + * @param inputBaclavaFile + * The baclava file to use for inputs, or <tt>null</tt> to use + * the other <b>input*</b> arguments' values. + * @param inputRealFiles + * A mapping of input names to files that supply them. Note that + * we assume that nothing mapped here will be mapped in + * <b>inputValues</b>. + * @param inputValues + * A mapping of input names to values to supply to them. Note + * that we assume that nothing mapped here will be mapped in + * <b>inputFiles</b>. + * @param inputDelimiters + * A mapping of input names to characters used to split them into + * lists. + * @param outputBaclavaFile + * What baclava file to write the output from the workflow into, + * or <tt>null</tt> to have it written into the <tt>out</tt> + * subdirectory. + * @param contextDirectory + * The directory containing the keystore and truststore. <i>Must + * not be <tt>null</tt>.</i> + * @param keystorePassword + * The password to the keystore and truststore. <i>Must not be + * <tt>null</tt>.</i> + * @param generateProvenance + * Whether to generate a run bundle containing provenance data. + * @param environment + * Any environment variables that need to be added to the + * invokation. + * @param masterToken + * The internal name of the workflow run. + * @param runtimeSettings + * List of configuration details for the forked runtime. + * @return Whether a successful start happened. + * @throws Exception + * If any of quite a large number of things goes wrong. + */ + boolean initWorker(LocalWorker local, String executeWorkflowCommand, + byte[] workflow, File workingDir, File inputBaclavaFile, + Map<String, File> inputRealFiles, Map<String, String> inputValues, + Map<String, String> inputDelimiters, File outputBaclavaFile, + File contextDirectory, char[] keystorePassword, + boolean generateProvenance, Map<String, String> environment, + String masterToken, List<String> runtimeSettings) throws Exception; + + /** + * Kills off the subprocess if it exists and is alive. + * + * @throws Exception + * if anything goes badly wrong when the worker is being killed + * off. + */ + void killWorker() throws Exception; + + /** + * Move the worker out of the stopped state and back to operating. + * + * @throws Exception + * if it fails (which it always does; operation currently + * unsupported). + */ + void startWorker() throws Exception; + + /** + * Move the worker into the stopped state from the operating state. + * + * @throws Exception + * if it fails (which it always does; operation currently + * unsupported). + */ + void stopWorker() throws Exception; + + /** + * @return The status of the workflow run. Note that this can be an + * expensive operation. + */ + RemoteStatus getWorkerStatus(); + + /** + * @return The listener that is registered by default, in addition to all + * those that are explicitly registered by the user. + */ + RemoteListener getDefaultListener(); + + /** + * @param receiver + * The destination where any final usage records are to be + * written in order to log them back to the server. + */ + void setURReceiver(UsageRecordReceiver receiver); + + /** + * Arrange for the deletion of any resources created during worker process + * construction. Guaranteed to be the last thing done before finalization. + * + * @throws ImplementationException + * If anything goes wrong. + */ + void deleteLocalResources() throws ImplementationException; +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/api/WorkerFactory.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/api/WorkerFactory.java b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/api/WorkerFactory.java new file mode 100644 index 0000000..0fd2d20 --- /dev/null +++ b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/api/WorkerFactory.java @@ -0,0 +1,34 @@ +package org.taverna.server.localworker.api; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/** + * Class that manufactures instances of {@link Worker}. + * + * @author Donal Fellows + */ +public interface WorkerFactory { + /** + * Create an instance of the low-level worker class. + * + * @return The worker object. + * @throws Exception + * If anything goes wrong. + */ + Worker makeInstance() throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/DirectoryDelegate.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/DirectoryDelegate.java b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/DirectoryDelegate.java new file mode 100644 index 0000000..6b7ba77 --- /dev/null +++ b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/DirectoryDelegate.java @@ -0,0 +1,174 @@ +/* + */ +package org.taverna.server.localworker.impl; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static org.apache.commons.io.FileUtils.forceDelete; +import static org.apache.commons.io.FileUtils.forceMkdir; +import static org.apache.commons.io.FileUtils.touch; +import static org.taverna.server.localworker.impl.utils.FilenameVerifier.getValidatedNewFile; + +import java.io.File; +import java.io.IOException; +import java.rmi.RemoteException; +import java.rmi.server.UnicastRemoteObject; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.List; + +import javax.annotation.Nonnull; + +import org.apache.commons.collections.MapIterator; +import org.apache.commons.collections.map.ReferenceMap; +import org.taverna.server.localworker.remote.RemoteDirectory; +import org.taverna.server.localworker.remote.RemoteDirectoryEntry; +import org.taverna.server.localworker.remote.RemoteFile; + +/** + * This class acts as a remote-aware delegate for the workflow run's working + * directory and its subdirectories. + * + * @author Donal Fellows + * @see FileDelegate + */ +@SuppressWarnings("serial") +public class DirectoryDelegate extends UnicastRemoteObject implements + RemoteDirectory { + private File dir; + private DirectoryDelegate parent; + private ReferenceMap localCache; + + /** + * @param dir + * @param parent + * @throws RemoteException + * If registration of the directory fails. + */ + public DirectoryDelegate(@Nonnull File dir, + @Nonnull DirectoryDelegate parent) throws RemoteException { + super(); + this.localCache = new ReferenceMap(); + this.dir = dir; + this.parent = parent; + } + + @Override + public Collection<RemoteDirectoryEntry> getContents() + throws RemoteException { + List<RemoteDirectoryEntry> result = new ArrayList<>(); + for (String s : dir.list()) { + if (s.equals(".") || s.equals("..")) + continue; + File f = new File(dir, s); + RemoteDirectoryEntry entry; + synchronized (localCache) { + entry = (RemoteDirectoryEntry) localCache.get(s); + if (f.isDirectory()) { + if (entry == null || !(entry instanceof DirectoryDelegate)) { + entry = new DirectoryDelegate(f, this); + localCache.put(s, entry); + } + } else if (f.isFile()) { + if (entry == null || !(entry instanceof FileDelegate)) { + entry = new FileDelegate(f, this); + localCache.put(s, entry); + } + } else { + // not file or dir; skip... + continue; + } + } + result.add(entry); + } + return result; + } + + @Override + public RemoteFile makeEmptyFile(String name) throws IOException { + File f = getValidatedNewFile(dir, name); + touch(f); + FileDelegate delegate = new FileDelegate(f, this); + synchronized (localCache) { + localCache.put(name, delegate); + } + return delegate; + } + + @Override + public RemoteDirectory makeSubdirectory(String name) throws IOException { + File f = getValidatedNewFile(dir, name); + forceMkdir(f); + DirectoryDelegate delegate = new DirectoryDelegate(f, this); + synchronized (localCache) { + localCache.put(name, delegate); + } + return delegate; + } + + @SuppressWarnings("unchecked") + @Override + public void destroy() throws IOException { + if (parent == null) + throw new IOException("tried to destroy main job working directory"); + Collection<RemoteDirectoryEntry> values; + synchronized (localCache) { + values = new ArrayList<>(localCache.values()); + } + for (RemoteDirectoryEntry obj : values) { + if (obj == null) + continue; + try { + obj.destroy(); + } catch (IOException e) { + } + } + forceDelete(dir); + parent.forgetEntry(this); + } + + @Override + public RemoteDirectory getContainingDirectory() { + return parent; + } + + void forgetEntry(@Nonnull RemoteDirectoryEntry entry) { + synchronized (localCache) { + MapIterator i = localCache.mapIterator(); + while (i.hasNext()) { + Object key = i.next(); + if (entry == i.getValue()) { + localCache.remove(key); + break; + } + } + } + } + + @Override + public String getName() { + if (parent == null) + return ""; + return dir.getName(); + } + + @Override + public Date getModificationDate() throws RemoteException { + return new Date(dir.lastModified()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/FileDelegate.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/FileDelegate.java b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/FileDelegate.java new file mode 100644 index 0000000..8dd9ede --- /dev/null +++ b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/FileDelegate.java @@ -0,0 +1,155 @@ +/* + */ +package org.taverna.server.localworker.impl; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static java.lang.System.arraycopy; +import static java.net.InetAddress.getLocalHost; +import static org.apache.commons.io.FileUtils.copyFile; +import static org.apache.commons.io.FileUtils.forceDelete; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.UnknownHostException; +import java.rmi.RemoteException; +import java.rmi.server.UnicastRemoteObject; +import java.util.Date; + +import javax.annotation.Nonnull; + +import org.taverna.server.localworker.remote.RemoteDirectory; +import org.taverna.server.localworker.remote.RemoteFile; + +/** + * This class acts as a remote-aware delegate for the files in a workflow run's + * working directory and its subdirectories. + * + * @author Donal Fellows + * @see DirectoryDelegate + */ [email protected]("serial") +public class FileDelegate extends UnicastRemoteObject implements RemoteFile { + private File file; + private DirectoryDelegate parent; + + /** + * @param file + * @param parent + * @throws RemoteException + * If registration of the file fails. + */ + public FileDelegate(@Nonnull File file, @Nonnull DirectoryDelegate parent) + throws RemoteException { + super(); + this.file = file; + this.parent = parent; + } + + @Override + public byte[] getContents(int offset, int length) throws IOException { + if (length == -1) + length = (int) (file.length() - offset); + if (length < 0 || length > 1024 * 64) + length = 1024 * 64; + byte[] buffer = new byte[length]; + int read; + try (FileInputStream fis = new FileInputStream(file)) { + if (offset > 0 && fis.skip(offset) != offset) + throw new IOException("did not move to correct offset in file"); + read = fis.read(buffer); + } + if (read <= 0) + return new byte[0]; + if (read < buffer.length) { + byte[] shortened = new byte[read]; + arraycopy(buffer, 0, shortened, 0, read); + return shortened; + } + return buffer; + } + + @Override + public long getSize() { + return file.length(); + } + + @Override + public void setContents(byte[] data) throws IOException { + try (FileOutputStream fos = new FileOutputStream(file)) { + fos.write(data); + } + } + + @Override + public void appendContents(byte[] data) throws IOException { + try (FileOutputStream fos = new FileOutputStream(file, true)) { + fos.write(data); + } + } + + @Override + public void destroy() throws IOException { + forceDelete(file); + parent.forgetEntry(this); + parent = null; + } + + @Override + public RemoteDirectory getContainingDirectory() { + return parent; + } + + @Override + public String getName() { + return file.getName(); + } + + @Override + public void copy(RemoteFile sourceFile) throws RemoteException, IOException { + String sourceHost = sourceFile.getNativeHost(); + if (!getNativeHost().equals(sourceHost)) { + throw new IOException( + "cross-system copy not implemented; cannot copy from " + + sourceHost + " to " + getNativeHost()); + } + // Must copy; cannot count on other file to stay unmodified + copyFile(new File(sourceFile.getNativeName()), file); + } + + @Override + public String getNativeName() { + return file.getAbsolutePath(); + } + + @Override + public String getNativeHost() { + try { + return getLocalHost().getHostAddress(); + } catch (UnknownHostException e) { + throw new RuntimeException( + "unexpected failure to resolve local host address", e); + } + } + + @Override + public Date getModificationDate() throws RemoteException { + return new Date(file.lastModified()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/LocalWorker.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/LocalWorker.java b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/LocalWorker.java new file mode 100644 index 0000000..f96f91c --- /dev/null +++ b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/LocalWorker.java @@ -0,0 +1,782 @@ +/* + */ +package org.taverna.server.localworker.impl; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static java.lang.Runtime.getRuntime; +import static java.lang.System.getProperty; +import static java.lang.System.out; +import static java.lang.management.ManagementFactory.getRuntimeMXBean; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static java.util.UUID.randomUUID; +import static org.apache.commons.io.FileUtils.forceDelete; +import static org.apache.commons.io.FileUtils.forceMkdir; +import static org.apache.commons.io.FileUtils.writeByteArrayToFile; +import static org.apache.commons.io.FileUtils.writeLines; +import static org.taverna.server.localworker.api.Constants.HELIO_TOKEN_NAME; +import static org.taverna.server.localworker.api.Constants.KEYSTORE_FILE; +import static org.taverna.server.localworker.api.Constants.KEYSTORE_PASSWORD; +import static org.taverna.server.localworker.api.Constants.SECURITY_DIR_NAME; +import static org.taverna.server.localworker.api.Constants.SHARED_DIR_PROP; +import static org.taverna.server.localworker.api.Constants.SUBDIR_LIST; +import static org.taverna.server.localworker.api.Constants.SYSTEM_ENCODING; +import static org.taverna.server.localworker.api.Constants.TRUSTSTORE_FILE; +import static org.taverna.server.localworker.impl.utils.FilenameVerifier.getValidatedFile; +import static org.taverna.server.localworker.remote.RemoteStatus.Finished; +import static org.taverna.server.localworker.remote.RemoteStatus.Initialized; +import static org.taverna.server.localworker.remote.RemoteStatus.Operating; +import static org.taverna.server.localworker.remote.RemoteStatus.Stopped; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URL; +import java.rmi.RemoteException; +import java.rmi.server.UnicastRemoteObject; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; + +import org.taverna.server.localworker.api.Worker; +import org.taverna.server.localworker.api.WorkerFactory; +import org.taverna.server.localworker.remote.IllegalStateTransitionException; +import org.taverna.server.localworker.remote.ImplementationException; +import org.taverna.server.localworker.remote.RemoteDirectory; +import org.taverna.server.localworker.remote.RemoteInput; +import org.taverna.server.localworker.remote.RemoteListener; +import org.taverna.server.localworker.remote.RemoteSecurityContext; +import org.taverna.server.localworker.remote.RemoteSingleRun; +import org.taverna.server.localworker.remote.RemoteStatus; +import org.taverna.server.localworker.remote.StillWorkingOnItException; +import org.taverna.server.localworker.server.UsageRecordReceiver; + +/** + * This class implements one side of the connection between the Taverna Server + * master server and this process. It delegates to a {@link Worker} instance the + * handling of actually running a workflow. + * + * @author Donal Fellows + * @see DirectoryDelegate + * @see FileDelegate + * @see WorkerCore + */ +@SuppressWarnings("serial") +public class LocalWorker extends UnicastRemoteObject implements RemoteSingleRun { + // ----------------------- CONSTANTS ----------------------- + + /** Handle to the directory containing the security info. */ + static final File SECURITY_DIR; + static final String SLASHTEMP; + static { + SLASHTEMP = getProperty("java.io.tmpdir"); + File home = new File(getProperty("user.home")); + // If we can't write to $HOME (i.e., we're in an odd deployment) use + // the official version of /tmp/$PID as a fallback. + if (!home.canWrite()) + home = new File(SLASHTEMP, getRuntimeMXBean().getName()); + SECURITY_DIR = new File(home, SECURITY_DIR_NAME); + } + + // ----------------------- VARIABLES ----------------------- + + /** + * Magic flag used to turn off problematic code when testing inside CI + * environment. + */ + static boolean DO_MKDIR = true; + + /** What to use to run a workflow engine. */ + private final String executeWorkflowCommand; + /** What workflow to run. */ + private final byte[] workflow; + /** The remote access object for the working directory. */ + private final DirectoryDelegate baseDir; + /** What inputs to pass as files. */ + final Map<String, String> inputFiles; + /** What inputs to pass as files (as file refs). */ + final Map<String, File> inputRealFiles; + /** What inputs to pass as direct values. */ + final Map<String, String> inputValues; + /** What delimiters to use. */ + final Map<String, String> inputDelimiters; + /** The interface to the workflow engine subprocess. */ + private final Worker core; + /** Our descriptor token (UUID). */ + private final String masterToken; + /** + * The root working directory for a workflow run, or <tt>null</tt> if it has + * been deleted. + */ + private File base; + /** + * When did this workflow start running, or <tt>null</tt> for + * "never/not yet". + */ + private Date start; + /** + * When did this workflow finish running, or <tt>null</tt> for + * "never/not yet". + */ + private Date finish; + /** The cached status of the workflow run. */ + RemoteStatus status; + /** + * The name of the input Baclava document, or <tt>null</tt> to not do it + * that way. + */ + String inputBaclava; + /** + * The name of the output Baclava document, or <tt>null</tt> to not do it + * that way. + */ + String outputBaclava; + /** + * The file containing the input Baclava document, or <tt>null</tt> to not + * do it that way. + */ + private File inputBaclavaFile; + /** + * The file containing the output Baclava document, or <tt>null</tt> to not + * do it that way. + */ + private File outputBaclavaFile; + /** + * Registered shutdown hook so that we clean up when this process is killed + * off, or <tt>null</tt> if that is no longer necessary. + */ + Thread shutdownHook; + /** Location for security information to be written to. */ + File securityDirectory; + /** + * Password to use to encrypt security information. + */ + char[] keystorePassword = KEYSTORE_PASSWORD; + /** Additional server-specified environment settings. */ + Map<String, String> environment = new HashMap<>(); + /** Additional server-specified java runtime settings. */ + List<String> runtimeSettings = new ArrayList<>(); + URL interactionFeedURL; + URL webdavURL; + URL publishURL;//FIXME + private boolean doProvenance = true; + + // ----------------------- METHODS ----------------------- + + /** + * @param executeWorkflowCommand + * The script used to execute workflows. + * @param workflow + * The workflow to execute. + * @param workerClass + * The class to instantiate as our local representative of the + * run. + * @param urReceiver + * The remote class to report the generated usage record(s) to. + * @param id + * The UUID to use, or <tt>null</tt> if we are to invent one. + * @param seedEnvironment + * The key/value pairs to seed the worker subprocess environment + * with. + * @param javaParams + * Parameters to pass to the worker subprocess java runtime + * itself. + * @param workerFactory + * How to make instances of the low-level worker objects. + * @throws RemoteException + * If registration of the worker fails. + * @throws ImplementationException + * If something goes wrong during local setup. + */ + protected LocalWorker(String executeWorkflowCommand, byte[] workflow, + UsageRecordReceiver urReceiver, UUID id, + Map<String, String> seedEnvironment, List<String> javaParams, + WorkerFactory workerFactory) throws RemoteException, + ImplementationException { + super(); + if (id == null) + id = randomUUID(); + masterToken = id.toString(); + this.workflow = workflow; + this.executeWorkflowCommand = executeWorkflowCommand; + String sharedDir = getProperty(SHARED_DIR_PROP, SLASHTEMP); + base = new File(sharedDir, masterToken); + out.println("about to create " + base); + try { + forceMkdir(base); + for (String subdir : SUBDIR_LIST) { + new File(base, subdir).mkdir(); + } + } catch (IOException e) { + throw new ImplementationException( + "problem creating run working directory", e); + } + baseDir = new DirectoryDelegate(base, null); + inputFiles = new HashMap<>(); + inputRealFiles = new HashMap<>(); + inputValues = new HashMap<>(); + inputDelimiters = new HashMap<>(); + environment.putAll(seedEnvironment); + runtimeSettings.addAll(javaParams); + try { + core = workerFactory.makeInstance(); + } catch (Exception e) { + out.println("problem when creating core worker implementation"); + e.printStackTrace(out); + throw new ImplementationException( + "problem when creating core worker implementation", e); + } + core.setURReceiver(urReceiver); + Thread t = new Thread(new Runnable() { + /** + * Kill off the worker launched by the core. + */ + @Override + public void run() { + try { + shutdownHook = null; + destroy(); + } catch (ImplementationException e) { + // Absolutely nothing we can do here + } + } + }); + getRuntime().addShutdownHook(t); + shutdownHook = t; + status = Initialized; + } + + @Override + public void destroy() throws ImplementationException { + killWorkflowSubprocess(); + removeFromShutdownHooks(); + // Is this it? + deleteWorkingDirectory(); + deleteSecurityManagerDirectory(); + core.deleteLocalResources(); + } + + private void killWorkflowSubprocess() { + if (status != Finished && status != Initialized) + try { + core.killWorker(); + if (finish == null) + finish = new Date(); + } catch (Exception e) { + out.println("problem when killing worker"); + e.printStackTrace(out); + } + } + + private void removeFromShutdownHooks() throws ImplementationException { + try { + if (shutdownHook != null) + getRuntime().removeShutdownHook(shutdownHook); + } catch (RuntimeException e) { + throw new ImplementationException("problem removing shutdownHook", + e); + } finally { + shutdownHook = null; + } + } + + private void deleteWorkingDirectory() throws ImplementationException { + try { + if (base != null) + forceDelete(base); + } catch (IOException e) { + out.println("problem deleting working directory"); + e.printStackTrace(out); + throw new ImplementationException( + "problem deleting working directory", e); + } finally { + base = null; + } + } + + private void deleteSecurityManagerDirectory() + throws ImplementationException { + try { + if (securityDirectory != null) + forceDelete(securityDirectory); + } catch (IOException e) { + out.println("problem deleting security directory"); + e.printStackTrace(out); + throw new ImplementationException( + "problem deleting security directory", e); + } finally { + securityDirectory = null; + } + } + + @Override + public void addListener(RemoteListener listener) throws RemoteException, + ImplementationException { + throw new ImplementationException("not implemented"); + } + + @Override + public String getInputBaclavaFile() { + return inputBaclava; + } + + @Override + public List<RemoteInput> getInputs() throws RemoteException { + ArrayList<RemoteInput> result = new ArrayList<>(); + for (String name : inputFiles.keySet()) + result.add(new InputDelegate(name)); + return result; + } + + @Override + public List<String> getListenerTypes() { + return emptyList(); + } + + @Override + public List<RemoteListener> getListeners() { + return singletonList(core.getDefaultListener()); + } + + @Override + public String getOutputBaclavaFile() { + return outputBaclava; + } + + class SecurityDelegate extends UnicastRemoteObject implements + RemoteSecurityContext { + private void setPrivatePerms(File dir) { + if (!dir.setReadable(false, false) || !dir.setReadable(true, true) + || !dir.setExecutable(false, false) + || !dir.setExecutable(true, true) + || !dir.setWritable(false, false) + || !dir.setWritable(true, true)) { + out.println("warning: " + + "failed to set permissions on security context directory"); + } + } + + protected SecurityDelegate(String token) throws IOException { + super(); + if (DO_MKDIR) { + securityDirectory = new File(SECURITY_DIR, token); + forceMkdir(securityDirectory); + setPrivatePerms(securityDirectory); + } + } + + /** + * Write some data to a given file in the context directory. + * + * @param name + * The name of the file to write. + * @param data + * The bytes to put in the file. + * @throws RemoteException + * If anything goes wrong. + * @throws ImplementationException + */ + protected void write(String name, byte[] data) throws RemoteException, + ImplementationException { + try { + File f = new File(securityDirectory, name); + writeByteArrayToFile(f, data); + } catch (IOException e) { + throw new ImplementationException("problem writing " + name, e); + } + } + + /** + * Write some data to a given file in the context directory. + * + * @param name + * The name of the file to write. + * @param data + * The lines to put in the file. The + * {@linkplain LocalWorker#SYSTEM_ENCODING system encoding} + * will be used to do the writing. + * @throws RemoteException + * If anything goes wrong. + * @throws ImplementationException + */ + protected void write(String name, Collection<String> data) + throws RemoteException, ImplementationException { + try { + File f = new File(securityDirectory, name); + writeLines(f, SYSTEM_ENCODING, data); + } catch (IOException e) { + throw new ImplementationException("problem writing " + name, e); + } + } + + /** + * Write some data to a given file in the context directory. + * + * @param name + * The name of the file to write. + * @param data + * The line to put in the file. The + * {@linkplain LocalWorker#SYSTEM_ENCODING system encoding} + * will be used to do the writing. + * @throws RemoteException + * If anything goes wrong. + * @throws ImplementationException + */ + protected void write(String name, char[] data) throws RemoteException, + ImplementationException { + try { + File f = new File(securityDirectory, name); + writeLines(f, SYSTEM_ENCODING, asList(new String(data))); + } catch (IOException e) { + throw new ImplementationException("problem writing " + name, e); + } + } + + @Override + public void setKeystore(byte[] keystore) throws RemoteException, + ImplementationException { + if (status != Initialized) + throw new RemoteException("not initializing"); + if (keystore == null) + throw new IllegalArgumentException("keystore may not be null"); + write(KEYSTORE_FILE, keystore); + } + + @Override + public void setPassword(char[] password) throws RemoteException { + if (status != Initialized) + throw new RemoteException("not initializing"); + if (password == null) + throw new IllegalArgumentException("password may not be null"); + keystorePassword = password.clone(); + } + + @Override + public void setTruststore(byte[] truststore) throws RemoteException, + ImplementationException { + if (status != Initialized) + throw new RemoteException("not initializing"); + if (truststore == null) + throw new IllegalArgumentException("truststore may not be null"); + write(TRUSTSTORE_FILE, truststore); + } + + @Override + public void setUriToAliasMap(Map<URI, String> uriToAliasMap) + throws RemoteException { + if (status != Initialized) + throw new RemoteException("not initializing"); + if (uriToAliasMap == null) + return; + ArrayList<String> lines = new ArrayList<>(); + for (Entry<URI, String> site : uriToAliasMap.entrySet()) + lines.add(site.getKey().toASCIIString() + " " + site.getValue()); + // write(URI_ALIAS_MAP, lines); + } + + @Override + public void setHelioToken(String helioToken) throws RemoteException { + if (status != Initialized) + throw new RemoteException("not initializing"); + out.println("registering HELIO CIS token for export"); + environment.put(HELIO_TOKEN_NAME, helioToken); + } + } + + @Override + public RemoteSecurityContext getSecurityContext() throws RemoteException, + ImplementationException { + try { + return new SecurityDelegate(masterToken); + } catch (RemoteException e) { + if (e.getCause() != null) + throw new ImplementationException( + "problem initializing security context", e.getCause()); + throw e; + } catch (IOException e) { + throw new ImplementationException( + "problem initializing security context", e); + } + } + + @Override + public RemoteStatus getStatus() { + // only state that can spontaneously change to another + if (status == Operating) { + status = core.getWorkerStatus(); + if (status == Finished && finish == null) + finish = new Date(); + } + return status; + } + + @Override + public RemoteDirectory getWorkingDirectory() { + return baseDir; + } + + File validateFilename(String filename) throws RemoteException { + if (filename == null) + throw new IllegalArgumentException("filename must be non-null"); + try { + return getValidatedFile(base, filename.split("/")); + } catch (IOException e) { + throw new IllegalArgumentException("failed to validate filename", e); + } + } + + class InputDelegate extends UnicastRemoteObject implements RemoteInput { + private String name; + + InputDelegate(String name) throws RemoteException { + super(); + this.name = name; + if (!inputFiles.containsKey(name)) { + if (status != Initialized) + throw new IllegalStateException("not initializing"); + inputFiles.put(name, null); + inputRealFiles.put(name, null); + inputValues.put(name, null); + inputDelimiters.put(name, null); + } + } + + @Override + public String getFile() { + return inputFiles.get(name); + } + + @Override + public String getName() { + return name; + } + + @Override + public String getValue() { + return inputValues.get(name); + } + + @Override + public String getDelimiter() throws RemoteException { + return inputDelimiters.get(name); + } + + @Override + public void setFile(String file) throws RemoteException { + if (status != Initialized) + throw new IllegalStateException("not initializing"); + inputRealFiles.put(name, validateFilename(file)); + inputValues.put(name, null); + inputFiles.put(name, file); + inputBaclava = null; + } + + @Override + public void setValue(String value) throws RemoteException { + if (status != Initialized) + throw new IllegalStateException("not initializing"); + inputValues.put(name, value); + inputFiles.put(name, null); + inputRealFiles.put(name, null); + inputBaclava = null; + } + + @Override + public void setDelimiter(String delimiter) throws RemoteException { + if (status != Initialized) + throw new IllegalStateException("not initializing"); + if (inputBaclava != null) + throw new IllegalStateException("input baclava file set"); + if (delimiter != null) { + if (delimiter.length() > 1) + throw new IllegalStateException( + "multi-character delimiter not permitted"); + if (delimiter.charAt(0) == 0) + throw new IllegalStateException( + "may not use NUL for splitting"); + if (delimiter.charAt(0) > 127) + throw new IllegalStateException( + "only ASCII characters supported for splitting"); + } + inputDelimiters.put(name, delimiter); + } + } + + @Override + public RemoteInput makeInput(String name) throws RemoteException { + return new InputDelegate(name); + } + + @Override + public RemoteListener makeListener(String type, String configuration) + throws RemoteException { + throw new RemoteException("listener manufacturing unsupported"); + } + + @Override + public void setInputBaclavaFile(String filename) throws RemoteException { + if (status != Initialized) + throw new IllegalStateException("not initializing"); + inputBaclavaFile = validateFilename(filename); + for (String input : inputFiles.keySet()) { + inputFiles.put(input, null); + inputRealFiles.put(input, null); + inputValues.put(input, null); + } + inputBaclava = filename; + } + + @Override + public void setOutputBaclavaFile(String filename) throws RemoteException { + if (status != Initialized) + throw new IllegalStateException("not initializing"); + if (filename != null) + outputBaclavaFile = validateFilename(filename); + else + outputBaclavaFile = null; + outputBaclava = filename; + } + + @Override + public void setGenerateProvenance(boolean prov) { + doProvenance = prov; + } + + @Override + public void setStatus(RemoteStatus newStatus) + throws IllegalStateTransitionException, RemoteException, + ImplementationException, StillWorkingOnItException { + if (status == newStatus) + return; + + switch (newStatus) { + case Initialized: + throw new IllegalStateTransitionException( + "may not move back to start"); + case Operating: + switch (status) { + case Initialized: + boolean started; + try { + started = createWorker(); + } catch (Exception e) { + throw new ImplementationException( + "problem creating executing workflow", e); + } + if (!started) + throw new StillWorkingOnItException( + "workflow start in process"); + break; + case Stopped: + try { + core.startWorker(); + } catch (Exception e) { + throw new ImplementationException( + "problem continuing workflow run", e); + } + break; + case Finished: + throw new IllegalStateTransitionException("already finished"); + default: + break; + } + status = Operating; + break; + case Stopped: + switch (status) { + case Initialized: + throw new IllegalStateTransitionException( + "may only stop from Operating"); + case Operating: + try { + core.stopWorker(); + } catch (Exception e) { + throw new ImplementationException( + "problem stopping workflow run", e); + } + break; + case Finished: + throw new IllegalStateTransitionException("already finished"); + default: + break; + } + status = Stopped; + break; + case Finished: + switch (status) { + case Operating: + case Stopped: + try { + core.killWorker(); + if (finish == null) + finish = new Date(); + } catch (Exception e) { + throw new ImplementationException( + "problem killing workflow run", e); + } + default: + break; + } + status = Finished; + break; + } + } + + private boolean createWorker() throws Exception { + start = new Date(); + char[] pw = keystorePassword; + keystorePassword = null; + /* + * Do not clear the keystorePassword array here; its ownership is + * *transferred* to the worker core which doesn't copy it but *does* + * clear it after use. + */ + return core.initWorker(this, executeWorkflowCommand, workflow, base, + inputBaclavaFile, inputRealFiles, inputValues, inputDelimiters, + outputBaclavaFile, securityDirectory, pw, doProvenance, + environment, masterToken, runtimeSettings); + } + + @Override + public Date getFinishTimestamp() { + return finish == null ? null : new Date(finish.getTime()); + } + + @Override + public Date getStartTimestamp() { + return start == null ? null : new Date(start.getTime()); + } + + @Override + public void setInteractionServiceDetails(URL feed, URL webdav, URL publish) { + interactionFeedURL = feed; + webdavURL = webdav; + publishURL = publish; + } + + @Override + public void ping() { + // Do nothing here; this *should* be empty + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/TavernaRunManager.java ---------------------------------------------------------------------- diff --git a/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/TavernaRunManager.java b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/TavernaRunManager.java new file mode 100644 index 0000000..167302c --- /dev/null +++ b/taverna-server-worker/src/main/java/org/apache/taverna/server/localworker/impl/TavernaRunManager.java @@ -0,0 +1,255 @@ +/* + */ +package org.taverna.server.localworker.impl; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static java.lang.Runtime.getRuntime; +import static java.lang.System.exit; +import static java.lang.System.getProperty; +import static java.lang.System.out; +import static java.lang.System.setProperty; +import static java.lang.System.setSecurityManager; +import static java.rmi.registry.LocateRegistry.getRegistry; +import static org.taverna.server.localworker.api.Constants.DEATH_DELAY; +import static org.taverna.server.localworker.api.Constants.LOCALHOST; +import static org.taverna.server.localworker.api.Constants.RMI_HOST_PROP; +import static org.taverna.server.localworker.api.Constants.SECURITY_POLICY_FILE; +import static org.taverna.server.localworker.api.Constants.SEC_POLICY_PROP; +import static org.taverna.server.localworker.api.Constants.UNSECURE_PROP; + +import java.io.ByteArrayInputStream; +import java.net.URI; +import java.rmi.RemoteException; +import java.rmi.registry.Registry; +import java.rmi.server.UnicastRemoteObject; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.taverna.scufl2.api.io.WorkflowBundleIO; +import org.taverna.server.localworker.api.RunAccounting; +import org.taverna.server.localworker.api.Worker; +import org.taverna.server.localworker.api.WorkerFactory; +import org.taverna.server.localworker.remote.RemoteRunFactory; +import org.taverna.server.localworker.remote.RemoteSingleRun; +import org.taverna.server.localworker.server.UsageRecordReceiver; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + + +/** + * The registered factory for runs, this class is responsible for constructing + * runs that are suitable for particular users. It is also the entry point for + * this whole process. + * + * @author Donal Fellows + * @see LocalWorker + */ +@SuppressWarnings("serial") +public class TavernaRunManager extends UnicastRemoteObject implements + RemoteRunFactory, RunAccounting, WorkerFactory { + String command; + Map<String, String> seedEnvironment = new HashMap<>(); + List<String> javaInitParams = new ArrayList<>(); + private WorkflowBundleIO io; + private int activeRuns = 0; + // Hacks! + public static String interactionHost; + public static String interactionPort; + public static String interactionWebdavPath; + public static String interactionFeedPath; + + /** + * How to get the actual workflow document from the XML document that it is + * contained in. + * + * @param containerDocument + * The document sent from the web interface. + * @return The element describing the workflow, as expected by the Taverna + * command line executor. + */ + protected Element unwrapWorkflow(Document containerDocument) { + return (Element) containerDocument.getDocumentElement().getFirstChild(); + } + + private static final String usage = "java -jar server.worker.jar workflowExecScript ?-Ekey=val...? ?-Jconfig? UUID"; + + /** + * An RMI-enabled factory for runs. + * + * @param command + * What command to call to actually run a run. + * @throws RemoteException + * If anything goes wrong during creation of the instance. + */ + public TavernaRunManager(String command) throws RemoteException { + this.command = command; + this.io = new WorkflowBundleIO(); + } + + @Override + public RemoteSingleRun make(byte[] workflow, String creator, + UsageRecordReceiver urReceiver, UUID id) throws RemoteException { + if (creator == null) + throw new RemoteException("no creator"); + try { + URI wfid = io.readBundle(new ByteArrayInputStream(workflow), null) + .getMainWorkflow().getIdentifier(); + out.println("Creating run from workflow <" + wfid + "> for <" + + creator + ">"); + return new LocalWorker(command, workflow, urReceiver, id, + seedEnvironment, javaInitParams, this); + } catch (RemoteException e) { + throw e; + } catch (Exception e) { + throw new RemoteException("bad instance construction", e); + } + } + + private static boolean shuttingDown; + private static String factoryName; + private static Registry registry; + + static synchronized void unregisterFactory() { + if (!shuttingDown) { + shuttingDown = true; + try { + if (factoryName != null && registry != null) + registry.unbind(factoryName); + } catch (Exception e) { + e.printStackTrace(out); + } + } + } + + @Override + public void shutdown() { + unregisterFactory(); + new Thread(new DelayedDeath()).start(); + } + + static class DelayedDeath implements Runnable { + @Override + public void run() { + try { + Thread.sleep(DEATH_DELAY); + } catch (InterruptedException e) { + } finally { + exit(0); + } + } + } + + private void addArgument(String arg) { + if (arg.startsWith("-E")) { + String trimmed = arg.substring(2); + int idx = trimmed.indexOf('='); + if (idx > 0) { + addEnvironmentDefinition(trimmed.substring(0, idx), + trimmed.substring(idx + 1)); + return; + } + } else if (arg.startsWith("-D")) { + if (arg.indexOf('=') > 0) { + addJavaParameter(arg); + return; + } + } else if (arg.startsWith("-J")) { + addJavaParameter(arg.substring(2)); + return; + } + throw new IllegalArgumentException("argument \"" + arg + + "\" must start with -D, -E or -J; " + + "-D and -E must contain a \"=\""); + } + + /** + * @param args + * The arguments from the command line invocation. + * @throws Exception + * If we can't connect to the RMI registry, or if we can't read + * the workflow, or if we can't build the worker instance, or + * register it. Also if the arguments are wrong. + */ + public static void main(String[] args) throws Exception { + if (args.length < 2) + throw new Exception("wrong # args: must be \"" + usage + "\""); + if (!getProperty(UNSECURE_PROP, "no").equals("yes")) { + setProperty(SEC_POLICY_PROP, LocalWorker.class.getClassLoader() + .getResource(SECURITY_POLICY_FILE).toExternalForm()); + setProperty(RMI_HOST_PROP, LOCALHOST); + } + setSecurityManager(new SecurityManager()); + factoryName = args[args.length - 1]; + TavernaRunManager man = new TavernaRunManager(args[0]); + for (int i = 1; i < args.length - 1; i++) + man.addArgument(args[i]); + registry = getRegistry(LOCALHOST); + + registry.bind(factoryName, man); + getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + unregisterFactory(); + } + }); + out.println("registered RemoteRunFactory with ID " + factoryName); + } + + private void addJavaParameter(String string) { + this.javaInitParams.add(string); + } + + private void addEnvironmentDefinition(String key, String value) { + this.seedEnvironment.put(key, value); + } + + @Override + public void setInteractionServiceDetails(String host, String port, + String webdavPath, String feedPath) throws RemoteException { + if (host == null || port == null || webdavPath == null + || feedPath == null) + throw new IllegalArgumentException("all params must be non-null"); + interactionHost = host; + interactionPort = port; + interactionWebdavPath = webdavPath; + interactionFeedPath = feedPath; + } + + @Override + public synchronized int countOperatingRuns() { + return (activeRuns < 0 ? 0 : activeRuns); + } + + @Override + public synchronized void runStarted() { + activeRuns++; + } + + @Override + public synchronized void runCeased() { + activeRuns--; + } + + @Override + public Worker makeInstance() throws Exception { + return new WorkerCore(this); + } +}
