http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/Policy.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/Policy.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/Policy.java new file mode 100644 index 0000000..b09e0bd --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/Policy.java @@ -0,0 +1,133 @@ +/* + */ +package org.taverna.server.master.interfaces; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.net.URI; +import java.util.List; + +import org.taverna.server.master.common.Status; +import org.taverna.server.master.common.Workflow; +import org.taverna.server.master.exceptions.NoCreateException; +import org.taverna.server.master.exceptions.NoDestroyException; +import org.taverna.server.master.exceptions.NoUpdateException; +import org.taverna.server.master.utils.UsernamePrincipal; + +/** + * Simple policy interface. + * + * @author Donal Fellows + */ +public interface Policy { + /** + * @return The maximum number of runs that the system can support. + */ + int getMaxRuns(); + + /** + * Get the limit on the number of runs for this user. + * + * @param user + * Who to get the limit for + * @return The maximum number of runs for this user, or <tt>null</tt> if no + * per-user limit is imposed and only system-wide limits are to be + * enforced. + */ + Integer getMaxRuns(UsernamePrincipal user); + + /** + * Test whether the user can create an instance of the given workflow. + * + * @param user + * Who wants to do the creation. + * @param workflow + * The workflow they wish to instantiate. + * @throws NoCreateException + * If they may not instantiate it. + */ + void permitCreate(UsernamePrincipal user, Workflow workflow) + throws NoCreateException; + + /** + * Test whether the user can destroy a workflow instance run or manipulate + * its expiry date. + * + * @param user + * Who wants to do the deletion. + * @param run + * What they want to delete. + * @throws NoDestroyException + * If they may not destroy it. + */ + void permitDestroy(UsernamePrincipal user, TavernaRun run) + throws NoDestroyException; + + /** + * Return whether the user has access to a particular workflow run. + * <b>Note</b> that this does not throw any exceptions! + * + * @param user + * Who wants to read the workflow's state. + * @param run + * What do they want to read from. + * @return Whether they can read it. Note that this check is always applied + * before testing whether the workflow can be updated or deleted by + * the user. + */ + boolean permitAccess(UsernamePrincipal user, TavernaRun run); + + /** + * Test whether the user can modify a workflow run (other than for its + * expiry date). + * + * @param user + * Who wants to do the modification. + * @param run + * What they want to modify. + * @throws NoUpdateException + * If they may not modify it. + */ + void permitUpdate(UsernamePrincipal user, TavernaRun run) + throws NoUpdateException; + + /** + * Get the URIs of the workflows that the given user may execute. + * + * @param user + * Who are we finding out on behalf of. + * @return A list of workflow URIs that they may instantiate, or + * <tt>null</tt> if any workflow may be submitted. + */ + List<URI> listPermittedWorkflowURIs(UsernamePrincipal user); + + /** + * @return The maximum number of {@linkplain Status#Operating operating} + * runs that the system can support. + */ + int getOperatingLimit(); + + /** + * Set the URIs of the workflows that the given user may execute. + * + * @param user + * Who are we finding out on behalf of. + * @param permitted + * A list of workflow URIs that they may instantiate. + */ + void setPermittedWorkflowURIs(UsernamePrincipal user, List<URI> permitted); +}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/RunStore.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/RunStore.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/RunStore.java new file mode 100644 index 0000000..b0d817a --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/RunStore.java @@ -0,0 +1,95 @@ +/* + */ +package org.taverna.server.master.interfaces; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Map; + +import org.taverna.server.master.exceptions.UnknownRunException; +import org.taverna.server.master.utils.UsernamePrincipal; + +/** + * Interface to the mechanism that looks after the mapping of names to runs. + * Instances of this class may also be responsible for enforcing timely cleanup + * of expired workflows. + * + * @author Donal Fellows. + */ +public interface RunStore { + /** + * Obtain the workflow run for a given user and name. + * + * @param user + * Who wants to do the lookup. + * @param p + * The general policy system context. + * @param uuid + * The handle for the run. + * @return The workflow instance run. + * @throws UnknownRunException + * If the lookup fails (either because it does not exist or + * because it is not permitted for the user by the policy). + */ + TavernaRun getRun(UsernamePrincipal user, Policy p, String uuid) + throws UnknownRunException; + + /** + * Obtain the named workflow run. + * + * @param uuid + * The handle for the run. + * @return The workflow instance run. + * @throws UnknownRunException + * If the lookup fails (either because it does not exist or + * because it is not permitted for the user by the policy). + */ + public TavernaRun getRun(String uuid) throws UnknownRunException; + + /** + * List the runs that a particular user may access. + * + * @param user + * Who wants to do the lookup, or <code>null</code> if it is + * being done "by the system" when the full mapping should be + * returned. + * @param p + * The general policy system context. + * @return A mapping from run names to run instances. + */ + Map<String, TavernaRun> listRuns(UsernamePrincipal user, Policy p); + + /** + * Adds a workflow instance run to the store. Note that this operation is + * <i>not</i> expected to be security-checked; that is the callers' + * responsibility. + * + * @param run + * The run itself. + * @return The name of the run. + */ + String registerRun(TavernaRun run); + + /** + * Removes a run from the store. Note that this operation is <i>not</i> + * expected to be security-checked; that is the callers' responsibility. + * + * @param uuid + * The name of the run. + */ + void unregisterRun(String uuid); +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/SecurityContextFactory.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/SecurityContextFactory.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/SecurityContextFactory.java new file mode 100644 index 0000000..a0cac79 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/SecurityContextFactory.java @@ -0,0 +1,45 @@ +/* + */ +package org.taverna.server.master.interfaces; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Serializable; + +import org.taverna.server.master.utils.UsernamePrincipal; + +/** + * How to create instances of a security context. + * + * @author Donal Fellows + */ +public interface SecurityContextFactory extends Serializable { + /** + * Creates a security context. + * + * @param run + * Handle to remote run. Allows the security context to know how + * to apply itself to the workflow run. + * @param owner + * The identity of the owner of the workflow run. + * @return The security context. + * @throws Exception + * If anything goes wrong. + */ + TavernaSecurityContext create(TavernaRun run, UsernamePrincipal owner) + throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/TavernaRun.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/TavernaRun.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/TavernaRun.java new file mode 100644 index 0000000..8d9a7f8 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/TavernaRun.java @@ -0,0 +1,232 @@ +/* + */ +package org.taverna.server.master.interfaces; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Serializable; +import java.util.Date; +import java.util.List; + +import org.taverna.server.master.common.Workflow; +import org.taverna.server.master.common.Status; +import org.taverna.server.master.exceptions.BadStateChangeException; +import org.taverna.server.master.exceptions.FilesystemAccessException; +import org.taverna.server.master.exceptions.NoDestroyException; +import org.taverna.server.master.exceptions.UnknownRunException; + +/** + * The interface to a taverna workflow run, or "run" for short. + * + * @author Donal Fellows + */ +public interface TavernaRun extends Serializable { + /** + * @return The identifier of the run. + */ + String getId(); + + /** + * @return What was this run was create to execute. + */ + Workflow getWorkflow(); + + /** + * @return The name of the run. + */ + String getName(); + + /** + * @param name + * The new name of the run. May be truncated. + */ + void setName(String name); + + /** + * @return The name of the Baclava file to use for all inputs, or + * <tt>null</tt> if no Baclava file is set. + */ + String getInputBaclavaFile(); + + /** + * Sets the Baclava file to use for all inputs. This overrides the use of + * individual inputs. + * + * @param filename + * The filename to use. Must not start with a <tt>/</tt> or + * contain any <tt>..</tt> segments. Will be interpreted relative + * to the run's working directory. + * @throws FilesystemAccessException + * If the filename is invalid. + * @throws BadStateChangeException + * If the workflow is not in the {@link Status#Initialized + * Initialized} state. + */ + void setInputBaclavaFile(String filename) throws FilesystemAccessException, + BadStateChangeException; + + /** + * @return The list of input assignments. + */ + List<Input> getInputs(); + + /** + * Create an input assignment. + * + * @param name + * The name of the port that this will be an input for. + * @return The assignment reference. + * @throws BadStateChangeException + * If the workflow is not in the {@link Status#Initialized + * Initialized} state. + */ + Input makeInput(String name) throws BadStateChangeException; + + /** + * @return The file (relative to the working directory) to write the outputs + * of the run to as a Baclava document, or <tt>null</tt> if they are + * to be written to non-Baclava files in a directory called + * <tt>out</tt>. + */ + String getOutputBaclavaFile(); + + /** + * Sets where the output of the run is to be written to. This will cause the + * output to be generated as a Baclava document, rather than a collection of + * individual non-Baclava files in the subdirectory of the working directory + * called <tt>out</tt>. + * + * @param filename + * Where to write the Baclava file (or <tt>null</tt> to cause the + * output to be written to individual files); overwrites any + * previous setting of this value. + * @throws FilesystemAccessException + * If the filename starts with a <tt>/</tt> or contains a + * <tt>..</tt> segment. + * @throws BadStateChangeException + * If the workflow is not in the {@link Status#Initialized + * Initialized} state. + */ + void setOutputBaclavaFile(String filename) + throws FilesystemAccessException, BadStateChangeException; + + /** + * @return When this run will expire, becoming eligible for automated + * deletion. + */ + Date getExpiry(); + + /** + * Set when this run will expire. + * + * @param d + * Expiry time. Deletion will happen some time after that. + */ + void setExpiry(Date d); + + /** + * @return The current status of the run. + */ + Status getStatus(); + + /** + * Set the status of the run, which should cause it to move into the given + * state. This may cause some significant changes. + * + * @param s + * The state to try to change to. + * @return <tt>null</tt>, or a string describing the incomplete state change + * if the operation has internally timed out. + * @throws BadStateChangeException + * If the change to the given state is impossible. + */ + String setStatus(Status s) throws BadStateChangeException; + + /** + * @return Handle to the main working directory of the run. + * @throws FilesystemAccessException + */ + Directory getWorkingDirectory() throws FilesystemAccessException; + + /** + * @return The list of listener instances attached to the run. + */ + List<Listener> getListeners(); + + /** + * Add a listener to the run. + * + * @param listener + * The listener to add. + */ + void addListener(Listener listener); + + /** + * @return The security context structure for this run. + */ + TavernaSecurityContext getSecurityContext(); + + /** + * Kill off this run, removing all resources which it consumes. + * + * @throws NoDestroyException + * If the destruction failed. + */ + void destroy() throws NoDestroyException; + + /** + * @return When this workflow run was created. + */ + Date getCreationTimestamp(); + + /** + * @return When this workflow run was started, or <tt>null</tt> if it has + * never been started. + */ + Date getStartTimestamp(); + + /** + * @return When this workflow run was found to have finished, or + * <tt>null</tt> if it has never finished (either still running or + * never started). + */ + Date getFinishTimestamp(); + + /** + * Test if this run is really there. + * + * <p> + * <i>Implementation note:</i> Used to test communication fabrics, etc. so + * implementations of this interface that do not delegate to another object + * should do nothing. + * + * @throws UnknownRunException + * If things fail. + */ + void ping() throws UnknownRunException; + + /** + * @return whether the run generates provenance data + */ + boolean getGenerateProvenance(); + + /** + * @param generateProvenance + * whether the run generates provenance data + */ + void setGenerateProvenance(boolean generateProvenance); +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/TavernaSecurityContext.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/TavernaSecurityContext.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/TavernaSecurityContext.java new file mode 100644 index 0000000..3f993df --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/TavernaSecurityContext.java @@ -0,0 +1,226 @@ +/* + */ +package org.taverna.server.master.interfaces; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.Principal; +import java.util.Set; + +import javax.ws.rs.core.HttpHeaders; +import javax.xml.ws.handler.MessageContext; + +import org.springframework.security.core.context.SecurityContext; +import org.taverna.server.localworker.remote.ImplementationException; +import org.taverna.server.master.common.Credential; +import org.taverna.server.master.common.Trust; +import org.taverna.server.master.exceptions.InvalidCredentialException; +import org.taverna.server.master.utils.UsernamePrincipal; + +/** + * Security context for a workflow run. + * + * @author Donal Fellows + */ +public interface TavernaSecurityContext { + /** + * @return Who owns the security context. + */ + UsernamePrincipal getOwner(); + + /** + * Describe the names of the users (as extracted from their + * {@link Principal} objects) that may destroy the run or manipulate its + * lifetime. + * + * @return The names of the users who may use destroy operations. Read-only. + */ + Set<String> getPermittedDestroyers(); + + /** + * Sets the collection of names of users (as extracted from their + * {@link Principal} objects) that may destroy the run or manipulate its + * lifetime. + * + * @param destroyers + * The names of the users who may use destroy operations. + */ + void setPermittedDestroyers(Set<String> destroyers); + + /** + * Describe the names of the users (as extracted from their + * {@link Principal} objects) that may update the run (including writing to + * files). + * + * @return The names of the users who may use update operations. Read-only. + */ + Set<String> getPermittedUpdaters(); + + /** + * Sets the collection of names of users (as extracted from their + * {@link Principal} objects) that may update the run (including writing to + * its files). + * + * @param updaters + * The names of the users who may use update operations. + */ + void setPermittedUpdaters(Set<String> updaters); + + /** + * Describe the names of the users (as extracted from their + * {@link Principal} objects) that may read from the run (including its + * files). + * + * @return The names of the users who may use read operations. Read-only. + */ + Set<String> getPermittedReaders(); + + /** + * Sets the collection of names of users (as extracted from their + * {@link Principal} objects) that may read from the run (including its + * files). + * + * @param readers + * The names of the users who may use read operations. + */ + void setPermittedReaders(Set<String> readers); + + /** + * @return The credentials owned by the user. Never <tt>null</tt>. + */ + Credential[] getCredentials(); + + /** + * Add a credential to the owned set or replaces the old version with the + * new one. + * + * @param toAdd + * The credential to add. + */ + void addCredential(Credential toAdd); + + /** + * Remove a credential from the owned set. It's not a failure to remove + * something that isn't in the set. + * + * @param toDelete + * The credential to remove. + */ + void deleteCredential(Credential toDelete); + + /** + * Tests if the credential is valid. This includes testing whether the + * underlying credential file exists and can be unlocked by the password in + * the {@link Credential} object. + * + * @param c + * The credential object to validate. + * @throws InvalidCredentialException + * If it is invalid. + */ + void validateCredential(Credential c) throws InvalidCredentialException; + + /** + * @return The identities trusted by the user. Never <tt>null</tt>. + */ + Trust[] getTrusted(); + + /** + * Add an identity to the trusted set. + * + * @param toAdd + * The identity to add. + */ + void addTrusted(Trust toAdd); + + /** + * Remove an identity from the trusted set. It's not a failure to remove + * something that isn't in the set. + * + * @param toDelete + * The identity to remove. + */ + void deleteTrusted(Trust toDelete); + + /** + * Tests if the trusted identity descriptor is valid. This includes checking + * whether the underlying trusted identity file exists. + * + * @param t + * The trusted identity descriptor to check. + * @throws InvalidCredentialException + * If it is invalid. + */ + void validateTrusted(Trust t) throws InvalidCredentialException; + + /** + * Establish the security context from how the owning workflow run was + * created. In particular, this gives an opportunity for boot-strapping + * things with any delegateable credentials. + * + * @param securityContext + * The security context associated with the request that caused + * the workflow to be created. + * @throws Exception + * If anything goes wrong. + */ + void initializeSecurityFromContext(SecurityContext securityContext) + throws Exception; + + /** + * Establish the security context from how the owning workflow run was + * created. In particular, this gives an opportunity for boot-strapping + * things with any delegateable credentials. + * + * @param context + * The full information about the request that caused the + * workflow to be created. + */ + void initializeSecurityFromSOAPContext(MessageContext context); + + /** + * Establish the security context from how the owning workflow run was + * created. In particular, this gives an opportunity for boot-strapping + * things with any delegateable credentials. + * + * @param headers + * The full information about the request that caused the + * workflow to be created. + */ + void initializeSecurityFromRESTContext(HttpHeaders headers); + + /** + * Transfer the security context to the remote system. + * + * @throws IOException + * If the communication fails. + * @throws GeneralSecurityException + * If the assembly of the context fails. + * @throws ImplementationException + * If the local worker has problems with creating the realized + * security context. + */ + void conveySecurity() throws GeneralSecurityException, IOException, + ImplementationException; + + /** + * @return The factory that created this security context. + */ + SecurityContextFactory getFactory(); +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/UriBuilderFactory.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/UriBuilderFactory.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/UriBuilderFactory.java new file mode 100644 index 0000000..c4d0fb5 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/UriBuilderFactory.java @@ -0,0 +1,56 @@ +/* + */ +package org.taverna.server.master.interfaces; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.net.URI; + +import javax.ws.rs.core.UriBuilder; + +/** + * How to manufacture URIs to workflow runs. + * + * @author Donal Fellows + */ +public interface UriBuilderFactory { + /** + * Given a run, get a factory for RESTful URIs to resources associated + * with it. + * + * @param run + * The run in question. + * @return The {@link URI} factory. + */ + UriBuilder getRunUriBuilder(TavernaRun run); + + /** + * @return a URI factory that is preconfigured to point to the base of + * the webapp. + */ + UriBuilder getBaseUriBuilder(); + + /** + * Resolves a URI with respect to the base URI of the factory. + * + * @param uri + * The URI to resolve, or <tt>null</tt>. + * @return The resolved URI, or <tt>null</tt> if <b>uri</b> is + * <tt>null</tt>. + */ + String resolve(String uri); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/package-info.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/package-info.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/package-info.java new file mode 100644 index 0000000..9c9b5b8 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/interfaces/package-info.java @@ -0,0 +1,23 @@ +/* + */ +/** + * Interfaces to the main worker classes that provide the magical power + * that drives the webapp front-end. + */ +package org.taverna.server.master.interfaces; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/AbstractRemoteRunFactory.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/AbstractRemoteRunFactory.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/AbstractRemoteRunFactory.java new file mode 100644 index 0000000..72004b4 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/AbstractRemoteRunFactory.java @@ -0,0 +1,452 @@ +/* + */ +package org.taverna.server.master.localworker; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static java.lang.System.getSecurityManager; +import static java.lang.System.setProperty; +import static java.lang.System.setSecurityManager; +import static java.rmi.registry.LocateRegistry.createRegistry; +import static java.rmi.registry.LocateRegistry.getRegistry; +import static java.rmi.registry.Registry.REGISTRY_PORT; +import static java.util.UUID.randomUUID; +import static org.taverna.server.master.TavernaServer.JMX_ROOT; +import static org.taverna.server.master.rest.TavernaServerRunREST.PathNames.DIR; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.net.URI; +import java.net.URL; +import java.rmi.MarshalledObject; +import java.rmi.RemoteException; +import java.rmi.registry.LocateRegistry; +import java.rmi.registry.Registry; +import java.rmi.server.UnicastRemoteObject; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.UUID; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.Resource; +import javax.xml.bind.JAXBException; + +import org.apache.commons.io.IOUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.jmx.export.annotation.ManagedResource; +import org.taverna.server.localworker.remote.RemoteRunFactory; +import org.taverna.server.localworker.remote.RemoteSingleRun; +import org.taverna.server.localworker.server.UsageRecordReceiver; +import org.taverna.server.master.common.Workflow; +import org.taverna.server.master.exceptions.NoCreateException; +import org.taverna.server.master.exceptions.NoListenerException; +import org.taverna.server.master.factories.ListenerFactory; +import org.taverna.server.master.factories.RunFactory; +import org.taverna.server.master.interaction.InteractionFeedSupport; +import org.taverna.server.master.interfaces.Listener; +import org.taverna.server.master.interfaces.SecurityContextFactory; +import org.taverna.server.master.interfaces.TavernaRun; +import org.taverna.server.master.interfaces.UriBuilderFactory; +import org.taverna.server.master.notification.atom.EventDAO; +import org.taverna.server.master.usage.UsageRecordRecorder; +import org.taverna.server.master.utils.UsernamePrincipal; +import org.taverna.server.master.worker.FactoryBean; +import org.taverna.server.master.worker.RemoteRunDelegate; +import org.taverna.server.master.worker.RunFactoryConfiguration; + +import org.apache.taverna.scufl2.api.io.WriterException; + +/** + * Bridge to remote runs via RMI. + * + * @author Donal Fellows + */ +@ManagedResource(objectName = JMX_ROOT + "Factory", description = "The factory for runs") +public abstract class AbstractRemoteRunFactory extends RunFactoryConfiguration + implements ListenerFactory, RunFactory, FactoryBean { + /** + * Whether to apply stronger limitations than normal to RMI. It is + * recommended that this be true! + */ + @Value("${rmi.localhostOnly}") + private boolean rmiLocalhostOnly; + /** The interaction host name. */ + private String interhost; + /** The interaction port number. */ + private String interport; + private Process registryProcess; + /** + * The interaction WebDAV location. Will be resolved before being passed to + * the back-end. + */ + private String interwebdav; + /** + * The interaction ATOM feed location. Will be resolved before being passed + * to the back-end. + */ + private String interfeed; + /** Used for doing URI resolution. */ + @Resource(name = "webapp") + private UriBuilderFactory baseurifactory; + @Autowired + private InteractionFeedSupport interactionFeedSupport; + + @Value("${taverna.interaction.host}") + void setInteractionHost(String host) { + if (host != null && host.equals("none")) + host = null; + interhost = host; + } + + @Value("${taverna.interaction.port}") + void setInteractionPort(String port) { + if (port != null && port.equals("none")) + port = null; + interport = port; + } + + @Value("${taverna.interaction.webdav_path}") + void setInteractionWebdav(String webdav) { + if (webdav != null && webdav.equals("none")) + webdav = null; + interwebdav = webdav; + } + + @Value("${taverna.interaction.feed_path}") + void setInteractionFeed(String feed) { + if (feed != null && feed.equals("none")) + feed = null; + interfeed = feed; + } + + @Override + protected void reinitRegistry() { + registry = null; + if (registryProcess != null) { + registryProcess.destroy(); + registryProcess = null; + } + } + + protected void initInteractionDetails(RemoteRunFactory factory) + throws RemoteException { + if (interhost != null) { + String feed = baseurifactory.resolve(interfeed); + String webdav = baseurifactory.resolve(interwebdav); + factory.setInteractionServiceDetails(interhost, interport, webdav, + feed); + } + } + + protected static final Process launchSubprocess(ProcessBuilder b) + throws IOException { + Thread t = Thread.currentThread(); + ClassLoader ccl = t.getContextClassLoader(); + try { + t.setContextClassLoader(null); + return b.start(); + } finally { + t.setContextClassLoader(ccl); + } + } + + /** Get a handle to a new instance of the RMI registry. */ + private Registry makeRegistry(int port) throws RemoteException { + ProcessBuilder p = new ProcessBuilder(getJavaBinary()); + p.command().add("-jar"); + p.command().add(getRmiRegistryJar()); + p.command().add(Integer.toString(port)); + p.command().add(Boolean.toString(rmiLocalhostOnly)); + try { + Process proc = launchSubprocess(p); + Thread.sleep(getSleepTime()); + try { + if (proc.exitValue() == 0) + return null; + String error = IOUtils.toString(proc.getErrorStream()); + throw new RemoteException(error); + } catch (IllegalThreadStateException ise) { + // Still running! + } + try (ObjectInputStream ois = new ObjectInputStream( + proc.getInputStream())) { + @SuppressWarnings("unchecked") + Registry r = ((MarshalledObject<Registry>) ois.readObject()) + .get(); + registryProcess = proc; + return r; + } + } catch (RemoteException e) { + throw e; + } catch (ClassNotFoundException e) { + throw new RemoteException("unexpected registry type", e); + } catch (IOException e) { + throw new RemoteException("unexpected IO problem with registry", e); + } catch (InterruptedException e) { + throw new RemoteException("unexpected interrupt"); + } + } + + /** + * @return A handle to the current RMI registry. + */ + protected Registry getTheRegistry() { + try { + if (registry != null) { + registry.list(); + return registry; + } + } catch (RemoteException e) { + log.warn("non-functioning existing registry handle", e); + registry = null; + } + try { + registry = getRegistry(getRegistryHost(), getRegistryPort()); + registry.list(); + return registry; + } catch (RemoteException e) { + log.warn("Failed to get working RMI registry handle."); + registry = null; + log.warn("Will build new registry, " + + "but service restart ability is at risk."); + try { + registry = makeRegistry(getRegistryPort()); + registry.list(); + return registry; + } catch (RemoteException e2) { + log.error( + "failed to create local working RMI registry on port " + + getRegistryPort(), e2); + log.info("original connection exception", e); + } + } + try { + registry = getRegistry(getRegistryHost(), REGISTRY_PORT); + registry.list(); + return registry; + } catch (RemoteException e) { + log.warn("Failed to get working RMI registry handle on backup port."); + try { + registry = makeRegistry(REGISTRY_PORT); + registry.list(); + return registry; + } catch (RemoteException e2) { + log.fatal( + "totally failed to get registry handle, even on fallback!", + e2); + log.info("original connection exception", e); + registry = null; + throw new RuntimeException("No RMI Registry Available"); + } + } + } + + private Registry registry; + /** + * The name of the resource that describes the default security policy to + * install. + */ + public static final String SECURITY_POLICY_FILE = "security.policy"; + private SecurityContextFactory securityFactory; + UsageRecordRecorder usageRecordSink; + private EventDAO masterEventFeed; + + @Autowired(required = true) + void setSecurityContextFactory(SecurityContextFactory factory) { + this.securityFactory = factory; + } + + @Autowired(required = true) + void setMasterEventFeed(EventDAO masterEventFeed) { + this.masterEventFeed = masterEventFeed; + } + + @Autowired(required = true) + void setUsageRecordSink(UsageRecordRecorder usageRecordSink) { + this.usageRecordSink = usageRecordSink; + } + + /** + * Configures the Java security model. Not currently used, as it is + * viciously difficult to get right! + */ + @SuppressWarnings("unused") + private static void installSecurityManager() { + if (getSecurityManager() == null) { + setProperty("java.security.policy", AbstractRemoteRunFactory.class + .getClassLoader().getResource(SECURITY_POLICY_FILE) + .toExternalForm()); + setSecurityManager(new SecurityManager()); + } + } + + // static { + // installSecurityManager(); + // } + + /** + * Set up the run expiry management engine. + * + * @throws JAXBException + */ + public AbstractRemoteRunFactory() throws JAXBException { + try { + registry = LocateRegistry.getRegistry(); + registry.list(); + } catch (RemoteException e) { + log.warn("Failed to get working RMI registry handle."); + log.warn("Will build new registry, but service restart ability is at risk."); + try { + registry = createRegistry(REGISTRY_PORT); + registry.list(); + } catch (RemoteException e2) { + log.error("failed to create working RMI registry", e2); + log.info("original connection exception", e); + } + } + } + + @Override + public List<String> getSupportedListenerTypes() { + try { + RemoteRunDelegate rrd = runDB.pickArbitraryRun(); + if (rrd != null) + return rrd.getListenerTypes(); + log.warn("no remote runs; no listener types"); + } catch (Exception e) { + log.warn("failed to get list of listener types", e); + } + return new ArrayList<>(); + } + + @Override + public Listener makeListener(TavernaRun run, String listenerType, + String configuration) throws NoListenerException { + if (run instanceof RemoteRunDelegate) + return ((RemoteRunDelegate) run).makeListener(listenerType, + configuration); + throw new NoListenerException("unexpected run type: " + run.getClass()); + } + + @Override + public TavernaRun create(UsernamePrincipal creator, Workflow workflow) + throws NoCreateException { + try { + Date now = new Date(); + UUID id = randomUUID(); + RemoteSingleRun rsr = getRealRun(creator, workflow, id); + RemoteRunDelegate run = new RemoteRunDelegate(now, workflow, rsr, + state.getDefaultLifetime(), runDB, id, + state.getGenerateProvenance(), this); + run.setSecurityContext(securityFactory.create(run, creator)); + @Nonnull + URI feed = interactionFeedSupport.getFeedURI(run); + @Nonnull + URL feedUrl = feed.toURL(); + @Nonnull + URL webdavUrl = baseurifactory.getRunUriBuilder(run) + .path(DIR + "/interactions").build().toURL(); + @Nullable + URL pub = interactionFeedSupport.getLocalFeedBase(feed); + rsr.setInteractionServiceDetails(feedUrl, webdavUrl, pub); + return run; + } catch (NoCreateException e) { + log.warn("failed to build run instance", e); + throw e; + } catch (Exception e) { + log.warn("failed to build run instance", e); + throw new NoCreateException("failed to build run instance", e); + } + } + + /** + * Gets the RMI connector for a new run. + * + * @param creator + * Who is creating the workflow run. + * @param workflow + * What workflow are they instantiating. + * @param id + * The identity token for the run, newly minted. + * @return The remote interface to the run. + * @throws Exception + * Just about anything can go wrong... + */ + protected abstract RemoteSingleRun getRealRun(UsernamePrincipal creator, + Workflow workflow, UUID id) throws Exception; + + /** + * How to convert a wrapped workflow into XML. + * + * @param workflow + * The wrapped workflow. + * @return The XML version of the document. + * @throws JAXBException + * If serialization fails. + */ + protected byte[] serializeWorkflow(Workflow workflow) throws JAXBException { + try { + return workflow.getScufl2Bytes(); + } catch (IOException e) { + throw new JAXBException("problem converting to scufl2", e); + } catch (WriterException e) { + throw new JAXBException("problem converting to scufl2", e); + } + } + + private void acceptUsageRecord(String usageRecord) { + if (usageRecordSink != null) + usageRecordSink.storeUsageRecord(usageRecord); + runDB.checkForFinishNow(); + } + + /** + * Make a Remote object that can act as a consumer for usage records. + * + * @param creator + * + * @return The receiver, or <tt>null</tt> if the construction fails. + */ + protected UsageRecordReceiver makeURReciver(UsernamePrincipal creator) { + try { + @SuppressWarnings("serial") + class URReceiver extends UnicastRemoteObject implements + UsageRecordReceiver { + public URReceiver() throws RemoteException { + super(); + } + + @Override + public void acceptUsageRecord(String usageRecord) { + AbstractRemoteRunFactory.this.acceptUsageRecord(usageRecord); + } + } + return new URReceiver(); + } catch (RemoteException e) { + log.warn("failed to build usage record receiver", e); + return null; + } + } + + @Override + public EventDAO getMasterEventFeed() { + return masterEventFeed; + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/ForkRunFactory.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/ForkRunFactory.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/ForkRunFactory.java new file mode 100644 index 0000000..3f48644 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/ForkRunFactory.java @@ -0,0 +1,336 @@ +/* + */ +package org.taverna.server.master.localworker; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static java.lang.System.getProperty; +import static java.lang.Thread.sleep; +import static java.util.Arrays.asList; +import static java.util.Calendar.SECOND; +import static java.util.UUID.randomUUID; +import static org.taverna.server.master.TavernaServer.JMX_ROOT; + +import java.io.File; +import java.rmi.ConnectException; +import java.rmi.ConnectIOException; +import java.rmi.NotBoundException; +import java.rmi.RemoteException; +import java.util.Calendar; +import java.util.UUID; + +import javax.annotation.Nonnull; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.xml.bind.JAXBException; + +import org.springframework.jmx.export.annotation.ManagedAttribute; +import org.springframework.jmx.export.annotation.ManagedResource; +import org.taverna.server.localworker.remote.RemoteRunFactory; +import org.taverna.server.localworker.remote.RemoteSingleRun; +import org.taverna.server.master.common.Workflow; +import org.taverna.server.master.exceptions.NoCreateException; +import org.taverna.server.master.factories.ConfigurableRunFactory; +import org.taverna.server.master.utils.UsernamePrincipal; + +/** + * A simple factory for workflow runs that forks runs from a subprocess. + * + * @author Donal Fellows + */ +@ManagedResource(objectName = JMX_ROOT + "RunFactory", description = "The factory for simple singleton forked run.") +public class ForkRunFactory extends AbstractRemoteRunFactory implements + ConfigurableRunFactory { + private int lastStartupCheckCount; + private Integer lastExitCode; + private RemoteRunFactory factory; + private Process factoryProcess; + private String factoryProcessName; + + /** + * Create a factory for remote runs that works by forking off a subprocess. + * + * @throws JAXBException + * Shouldn't happen. + */ + public ForkRunFactory() throws JAXBException { + } + + @PostConstruct + protected void initRegistry() { + log.info("waiting for availability of default RMI registry"); + getTheRegistry(); + } + + @Override + protected void reinitFactory() { + boolean makeFactory = factory != null; + killFactory(); + try { + if (makeFactory) + initFactory(); + } catch (Exception e) { + log.fatal("failed to make connection to remote run factory", e); + } + } + + private RemoteRunFactory getFactory() throws RemoteException { + try { + initFactory(); + } catch (RemoteException e) { + throw e; + } catch (Exception e) { + throw new RemoteException("problem constructing factory", e); + } + return factory; + } + + /** + * @return How many checks were done for the worker process the last time a + * spawn was tried. + */ + @ManagedAttribute(description = "How many checks were done for the worker process the last time a spawn was tried.", currencyTimeLimit = 60) + @Override + public int getLastStartupCheckCount() { + return lastStartupCheckCount; + } + + /** + * @return What was the exit code from the last time the factory subprocess + * was killed? + */ + @ManagedAttribute(description = "What was the exit code from the last time the factory subprocess was killed?") + @Override + public Integer getLastExitCode() { + return lastExitCode; + } + + /** + * @return What the factory subprocess's main RMI interface is registered + * as. + */ + @ManagedAttribute(description = "What the factory subprocess's main RMI interface is registered as.", currencyTimeLimit = 60) + @Override + public String getFactoryProcessName() { + return factoryProcessName; + } + + /** + * Makes the subprocess that manufactures runs. + * + * @throws Exception + * If anything goes wrong. + */ + public void initFactory() throws Exception { + if (factory != null) + return; + // Generate the arguments to use when spawning the subprocess + factoryProcessName = state.getFactoryProcessNamePrefix() + randomUUID(); + ProcessBuilder p = new ProcessBuilder(getJavaBinary()); + p.command().add("-jar"); + p.command().add(getServerWorkerJar()); + if (getExecuteWorkflowScript() == null) + log.fatal("no execute workflow script"); + p.command().add(getExecuteWorkflowScript()); + p.command().addAll(asList(getExtraArguments())); + p.command().add(factoryProcessName); + p.redirectErrorStream(true); + p.directory(new File(getProperty("javax.servlet.context.tempdir", + getProperty("java.io.tmpdir")))); + + // Spawn the subprocess + log.info("about to create subprocess: " + p.command()); + + factoryProcess = launchSubprocess(p); + outlog = new StreamLogger("FactoryStdout", factoryProcess.getInputStream()) { + @Override + protected void write(String msg) { + log.info(msg); + } + }; + errlog = new StreamLogger("FactoryStderr", factoryProcess.getErrorStream()) { + @Override + protected void write(String msg) { + log.info(msg); + } + }; + + // Wait for the subprocess to register itself in the RMI registry + Calendar deadline = Calendar.getInstance(); + deadline.add(SECOND, state.getWaitSeconds()); + Exception lastException = null; + lastStartupCheckCount = 0; + while (deadline.after(Calendar.getInstance())) { + try { + sleep(state.getSleepMS()); + lastStartupCheckCount++; + factory = getRemoteFactoryHandle(factoryProcessName); + initInteractionDetails(factory); + return; + } catch (InterruptedException ie) { + continue; + } catch (NotBoundException nbe) { + lastException = nbe; + log.info("resource \"" + factoryProcessName + + "\" not yet registered..."); + continue; + } catch (RemoteException re) { + // Unpack a remote exception if we can + lastException = re; + try { + if (re.getCause() != null) + lastException = (Exception) re.getCause(); + } catch (Throwable t) { + // Ignore! + } + } catch (RuntimeException e) { + lastException = e; + } + } + if (lastException == null) + lastException = new InterruptedException(); + throw lastException; + } + + private StreamLogger outlog, errlog; + + private void stopLoggers() { + if (outlog != null) + outlog.stop(); + outlog = null; + if (errlog != null) + errlog.stop(); + errlog = null; + } + + private RemoteRunFactory getRemoteFactoryHandle(String name) + throws RemoteException, NotBoundException { + log.info("about to look up resource called " + name); + try { + // Validate registry connection first + getTheRegistry().list(); + } catch (ConnectException | ConnectIOException e) { + log.warn("connection problems with registry", e); + } + RemoteRunFactory rrf = (RemoteRunFactory) getTheRegistry().lookup(name); + log.info("successfully connected to factory subprocess " + + factoryProcessName); + return rrf; + } + + /** + * Destroys the subprocess that manufactures runs. + */ + @PreDestroy + public void killFactory() { + if (factory != null) { + log.info("requesting shutdown of " + factoryProcessName); + try { + factory.shutdown(); + sleep(700); + } catch (RemoteException e) { + log.warn(factoryProcessName + " failed to shut down nicely", e); + } catch (InterruptedException e) { + if (log.isDebugEnabled()) + log.debug("interrupted during wait after asking " + + factoryProcessName + " to shut down", e); + } finally { + factory = null; + } + } + + if (factoryProcess != null) { + int code = -1; + try { + lastExitCode = code = factoryProcess.exitValue(); + log.info(factoryProcessName + " already dead?"); + } catch (RuntimeException e) { + log.info("trying to force death of " + factoryProcessName); + try { + factoryProcess.destroy(); + sleep(350); // takes a little time, even normally + lastExitCode = code = factoryProcess.exitValue(); + } catch (Exception e2) { + code = -1; + } + } finally { + factoryProcess = null; + stopLoggers(); + } + if (code > 128) { + log.info(factoryProcessName + " died with signal=" + + (code - 128)); + } else if (code >= 0) { + log.info(factoryProcessName + " process killed: code=" + code); + } else { + log.warn(factoryProcessName + " not yet dead"); + } + } + } + + /** + * The real core of the run builder, factored out from its reliability + * support. + * + * @param creator + * Who created this workflow? + * @param wf + * The serialized workflow. + * @return The remote handle of the workflow run. + * @throws RemoteException + * If anything fails (communications error, etc.) + */ + private RemoteSingleRun getRealRun(@Nonnull UsernamePrincipal creator, + @Nonnull byte[] wf, UUID id) throws RemoteException { + @Nonnull + String globaluser = "Unknown Person"; + if (creator != null) + globaluser = creator.getName(); + RemoteSingleRun rsr = getFactory().make(wf, globaluser, + makeURReciver(creator), id); + incrementRunCount(); + return rsr; + } + + @Override + protected RemoteSingleRun getRealRun(UsernamePrincipal creator, + Workflow workflow, UUID id) throws Exception { + @Nonnull + byte[] wf = serializeWorkflow(workflow); + for (int i = 0; i < 3; i++) { + initFactory(); + try { + return getRealRun(creator, wf, id); + } catch (ConnectException | ConnectIOException e) { + // factory was lost; try to recreate + } + killFactory(); + } + throw new NoCreateException("total failure to connect to factory " + + factoryProcessName + "despite attempting restart"); + } + + @Override + public String[] getFactoryProcessMapping() { + return new String[0]; + } + + @Override + protected int operatingCount() throws Exception { + return getFactory().countOperatingRuns(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/IdAwareForkRunFactory.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/IdAwareForkRunFactory.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/IdAwareForkRunFactory.java new file mode 100644 index 0000000..a2e5ff7 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/IdAwareForkRunFactory.java @@ -0,0 +1,529 @@ +/* + */ +package org.taverna.server.master.localworker; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static java.lang.System.getProperty; +import static java.lang.Thread.sleep; +import static java.util.Arrays.asList; +import static java.util.Calendar.SECOND; +import static java.util.UUID.randomUUID; +import static org.taverna.server.master.TavernaServer.JMX_ROOT; +import static org.taverna.server.master.localworker.AbstractRemoteRunFactory.launchSubprocess; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.rmi.ConnectException; +import java.rmi.ConnectIOException; +import java.rmi.NotBoundException; +import java.rmi.RemoteException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import javax.annotation.Nonnull; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.xml.bind.JAXBException; + +import org.apache.commons.logging.Log; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.annotation.Order; +import org.springframework.jmx.export.annotation.ManagedAttribute; +import org.springframework.jmx.export.annotation.ManagedResource; +import org.taverna.server.localworker.remote.RemoteRunFactory; +import org.taverna.server.localworker.remote.RemoteSingleRun; +import org.taverna.server.master.common.Workflow; +import org.taverna.server.master.exceptions.NoCreateException; +import org.taverna.server.master.factories.ConfigurableRunFactory; +import org.taverna.server.master.interfaces.LocalIdentityMapper; +import org.taverna.server.master.utils.UsernamePrincipal; + +/** + * A simple factory for workflow runs that forks runs from a subprocess. + * + * @author Donal Fellows + */ +@ManagedResource(objectName = JMX_ROOT + "RunFactory", description = "The factory for a user-specific forked run.") +public class IdAwareForkRunFactory extends AbstractRemoteRunFactory implements + ConfigurableRunFactory { + private MetaFactory forker; + private Map<String, RemoteRunFactory> factory; + private Map<String, String> factoryProcessName; + + /** + * Create a factory for remote runs that works by forking off a subprocess. + * + * @throws JAXBException + * Shouldn't happen. + */ + public IdAwareForkRunFactory() throws JAXBException { + factory = new HashMap<>(); + factoryProcessName = new HashMap<>(); + } + + @Override + protected void reinitFactory() { + boolean makeForker = forker != null; + try { + killForker(); + } catch (Exception e) { + log.warn("exception when killing secure-fork process", e); + } + try { + if (makeForker) + initMetaFactory(); + } catch (Exception e) { + log.fatal("failed to make secure-fork process", e); + } + } + + /** + * @return How many checks were done for the worker process the last time a + * spawn was tried. + */ + @Override + @ManagedAttribute(description = "How many checks were done for the worker process the last time a spawn was tried.", currencyTimeLimit = 60) + public int getLastStartupCheckCount() { + return forker == null ? 0 : forker.lastStartupCheckCount(); + } + + /** + * @return What was the exit code from the last time the factory subprocess + * was killed? + */ + @Override + @ManagedAttribute(description = "What was the exit code from the last time the factory subprocess was killed?") + public Integer getLastExitCode() { + return forker == null ? null : forker.lastExitCode(); + } + + /** + * @return The mapping of user names to RMI factory IDs. + */ + @Override + @ManagedAttribute(description = "The mapping of user names to RMI factory IDs.", currencyTimeLimit = 60) + public String[] getFactoryProcessMapping() { + ArrayList<String> result = new ArrayList<>(); + ArrayList<String> keys = new ArrayList<>(factoryProcessName.keySet()); + String[] ks = keys.toArray(new String[keys.size()]); + Arrays.sort(ks); + for (String k : ks) { + result.add(k); + result.add(factoryProcessName.get(k)); + } + return result.toArray(new String[result.size()]); + } + + /** + * How construction of factories is actually done. + * + * @author Donal Fellows + */ + public interface MetaFactory { + /** + * Make a factory for the given user. + * + * @param username + * Who to make it for. + * @return Handle of the factory. + * @throws Exception + * If anything goes wrong. + */ + RemoteRunFactory make(String username) throws Exception; + + /** + * Shut down the meta-factory. It is not defined whether factories + * created by it are also shut down at the same time. + * + * @throws IOException + * If something goes wrong when communicating with the + * meta-factory. + * @throws InterruptedException + * If something stops us waiting for the shut down to + * happen. + */ + void close() throws IOException, InterruptedException; + + int lastStartupCheckCount(); + + Integer lastExitCode(); + } + + void registerFactory(String username, String fpn, RemoteRunFactory f) { + factoryProcessName.put(username, fpn); + factory.put(username, f); + } + + /** + * Makes the connection to the meta-factory that makes factories. + * + * @throws IOException + * If the connection fails. + */ + @PostConstruct + void initMetaFactory() throws IOException { + log.info("waiting for availability of default RMI registry"); + getTheRegistry(); + log.info("constructing secure fork subprocess"); + forker = new SecureFork(this, state, log); + } + + private void killForker() throws IOException, InterruptedException { + try { + if (forker != null) + forker.close(); + } finally { + forker = null; + } + } + + /** + * Makes the subprocess that manufactures runs. + * + * @throws Exception + * If anything goes wrong. + */ + private void initFactory(String username) throws Exception { + if (factory.containsKey(username)) + return; + if (forker == null) + initMetaFactory(); + forker.make(username); + } + + /** + * Destroys the subprocess that manufactures runs. + */ + @PreDestroy + public void killFactories() { + if (!factory.isEmpty()) { + Iterator<String> keys = factory.keySet().iterator(); + while (keys.hasNext()) { + String key = keys.next(); + log.info("requesting shutdown of " + + factoryProcessName.get(key)); + try { + factory.get(key).shutdown(); + } catch (RemoteException e) { + log.warn(factoryProcessName.get(key) + + " failed to shut down nicely", e); + } finally { + keys.remove(); + factoryProcessName.remove(key); + } + } + try { + sleep(700); + } catch (InterruptedException e) { + if (log.isDebugEnabled()) + log.debug("interrupted during wait after " + + "asking factories to shut down", e); + } + } + + try { + killForker(); + } catch (Exception e) { + if (log.isDebugEnabled()) + log.debug("exception in shutdown of secure-fork process", e); + } + } + + @Override + protected void finalize() throws Throwable { + killFactories(); + super.finalize(); + } + + @Autowired + public void setIdMapper(LocalIdentityMapper mapper) { + this.mapper = mapper; + } + + private LocalIdentityMapper mapper; + + /** + * The real core of the run builder, factored out from its reliability + * support. + * + * @param creator + * Who created this workflow? + * @param username + * What user account is this workflow to be executed in? + * @param wf + * The serialized workflow. + * @return The remote handle of the workflow run. + * @throws RemoteException + * If anything fails (communications error, etc.) + */ + private RemoteSingleRun getRealRun(@Nonnull UsernamePrincipal creator, + @Nonnull String username, @Nonnull byte[] wf, UUID id) + throws RemoteException { + String globaluser = "Unknown Person"; + if (creator != null) + globaluser = creator.getName(); + RemoteSingleRun rsr = factory.get(username).make(wf, globaluser, + makeURReciver(creator), id); + incrementRunCount(); + return rsr; + } + + @Override + protected RemoteSingleRun getRealRun(UsernamePrincipal creator, + Workflow workflow, UUID id) throws Exception { + byte[] wf = serializeWorkflow(workflow); + String username = mapper == null ? null : mapper + .getUsernameForPrincipal(creator); + if (username == null) + throw new Exception("cannot determine who to run workflow as; " + + "local identity mapper returned null"); + for (int i = 0; i < 3; i++) { + if (!factory.containsKey(username)) + initFactory(username); + try { + return getRealRun(creator, username, wf, id); + } catch (ConnectException | ConnectIOException e) { + // factory was lost; try to recreate + } + factory.remove(username); + } + throw new NoCreateException("total failure to connect to factory " + + factoryProcessName + "despite attempting restart"); + } + + @Value("${secureForkPasswordFile}") + @Order(20) + public void setPasswordSource(String passwordSource) { + if (passwordSource == null || passwordSource.isEmpty() + || passwordSource.startsWith("${")) + state.setDefaultPasswordFile(null); + else + state.setDefaultPasswordFile(passwordSource); + if (state.getPasswordFile() == null) + log.info("assuming password-free forking enabled"); + else + log.info("configured secureForkPasswordFile from context as " + + state.getPasswordFile()); + } + + @Override + public String getFactoryProcessName() { + return "<PROPERTY-NOT-SUPPORTED>"; + } + + @Override + protected int operatingCount() throws Exception { + int total = 0; + for (RemoteRunFactory rrf : factory.values()) + total += rrf.countOperatingRuns(); + return total; + } +} + +/** + * The connector that handles the secure fork process itself. + * + * @author Donal Fellows + */ +class SecureFork implements IdAwareForkRunFactory.MetaFactory { + private IdAwareForkRunFactory main; + private Process process; + private PrintWriter channel; + private int lastStartupCheckCount; + private Integer lastExitCode; + private Log log; + private LocalWorkerState state; + private StreamLogger out, err; + + /** + * Construct the command to run the meta-factory process. + * + * @param args + * The live list of arguments to pass. + */ + public void initFactoryArgs(List<String> args) { + args.add(main.getJavaBinary()); + String pwf = main.getPasswordFile(); + if (pwf != null) { + args.add("-Dpassword.file=" + pwf); + } + args.add("-jar"); + args.add(main.getServerForkerJar()); + args.add(main.getJavaBinary()); + args.add("-jar"); + args.add(main.getServerWorkerJar()); + if (main.getExecuteWorkflowScript() == null) + log.fatal("no execute workflow script"); + args.add(main.getExecuteWorkflowScript()); + args.addAll(asList(main.getExtraArguments())); + } + + SecureFork(IdAwareForkRunFactory main, LocalWorkerState state, Log log) + throws IOException { + this.main = main; + this.log = log; + this.state = state; + ProcessBuilder p = new ProcessBuilder(); + initFactoryArgs(p.command()); + p.redirectErrorStream(true); + p.directory(new File(getProperty("javax.servlet.context.tempdir", + getProperty("java.io.tmpdir")))); + + // Spawn the subprocess + log.info("about to create subprocess: " + p.command()); + log.info("subprocess directory: " + p.directory()); + process = launchSubprocess(p); + channel = new PrintWriter(new BufferedWriter(new OutputStreamWriter( + process.getOutputStream())), true); + // Log the responses + out = new StreamLogger("ForkedStdout", process.getInputStream()) { + @Override + protected void write(String msg) { + log.info(msg); + } + }; + err = new StreamLogger("ForkedStderr", process.getErrorStream()) { + @Override + protected void write(String msg) { + log.info(msg); + } + }; + } + + @Override + public void close() throws IOException, InterruptedException { + try { + if (process != null) { + log.info("about to close down subprocess"); + channel.close(); + int code = -1; + try { + try { + code = process.exitValue(); + log.info("secure-fork process already dead?"); + } catch (IllegalThreadStateException e) { + try { + code = process.waitFor(); + } catch (InterruptedException e1) { + log.info("interrupted waiting for natural death of secure-fork process?!"); + process.destroy(); + code = process.waitFor(); + } + } + } finally { + lastExitCode = code; + if (code > 128) { + log.info("secure-fork process died with signal=" + + (code - 128)); + } else if (code >= 0) { + log.info("secure-fork process killed: code=" + code); + } else { + log.warn("secure-fork process not yet dead"); + } + } + } + } finally { + process = null; + channel = null; + out.stop(); + err.stop(); + } + } + + protected void make(String username, String fpn) { + log.info("about to request subprocess creation for " + username + + " producing ID " + fpn); + channel.println(username + " " + fpn); + } + + @Override + public RemoteRunFactory make(String username) throws Exception { + try { + main.getTheRegistry().list(); // Validate registry connection first + } catch (ConnectException | ConnectIOException e) { + log.warn("connection problems with registry", e); + } catch (RemoteException e) { + if (e.getCause() != null && e.getCause() instanceof Exception) { + throw (Exception) e.getCause(); + } + log.warn("connection problems with registry", e); + } + + String fpn = state.getFactoryProcessNamePrefix() + randomUUID(); + make(username, fpn); + + // Wait for the subprocess to register itself in the RMI registry + Calendar deadline = Calendar.getInstance(); + deadline.add(SECOND, state.getWaitSeconds()); + Exception lastException = null; + lastStartupCheckCount = 0; + while (deadline.after(Calendar.getInstance())) { + try { + sleep(state.getSleepMS()); + lastStartupCheckCount++; + log.info("about to look up resource called " + fpn); + RemoteRunFactory f = (RemoteRunFactory) main.getTheRegistry() + .lookup(fpn); + log.info("successfully connected to factory subprocess " + fpn); + main.initInteractionDetails(f); + main.registerFactory(username, fpn, f); + return f; + } catch (InterruptedException ie) { + continue; + } catch (NotBoundException nbe) { + lastException = nbe; + log.info("resource \"" + fpn + "\" not yet registered..."); + continue; + } catch (RemoteException re) { + // Unpack a remote exception if we can + lastException = re; + try { + if (re.getCause() != null) + lastException = (Exception) re.getCause(); + } catch (Throwable t) { + // Ignore! + } + } catch (Exception e) { + lastException = e; + } + } + if (lastException == null) + lastException = new InterruptedException(); + throw lastException; + } + + @Override + public Integer lastExitCode() { + return lastExitCode; + } + + @Override + public int lastStartupCheckCount() { + return lastStartupCheckCount; + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/LocalWorkerFactory.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/LocalWorkerFactory.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/LocalWorkerFactory.java new file mode 100644 index 0000000..803c201 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/LocalWorkerFactory.java @@ -0,0 +1,44 @@ +/* + */ +package org.taverna.server.master.localworker; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * Provider of the configuration of the "localworker.factory" bean, which is + * sufficiently complex to be too hard to manufacture directly from the XML + * configuration. + * + * @author Donal Fellows + */ +@Configuration +public class LocalWorkerFactory { + @Bean(name = "localworker.factory") + AbstractRemoteRunFactory getLocalworkerFactory( + @Value("${backEndFactory}") String mode) throws Exception { + if (mode == null || mode.isEmpty() || mode.startsWith("${")) + throw new Exception("no value for ${backEndFactory}"); + Class<?> c = Class.forName(mode); + if (AbstractRemoteRunFactory.class.isAssignableFrom(c)) + return (AbstractRemoteRunFactory) c.newInstance(); + throw new Exception("unknown remote run factory: " + mode); + } +}
