http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/CompletionNotifier.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/CompletionNotifier.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/CompletionNotifier.java new file mode 100644 index 0000000..1868f94 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/CompletionNotifier.java @@ -0,0 +1,58 @@ +/* + */ +package org.taverna.server.master.worker; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/** + * How to convert a notification about the completion of a job into a message. + * + * @author Donal Fellows + */ +public interface CompletionNotifier { + /** + * @return The name of this notifier. + */ + String getName(); + + /** + * Called to get the content of a message that a workflow run has finished. + * + * @param name + * The name of the run. + * @param run + * What run are we talking about. + * @param code + * What the exit code was. + * @return The plain-text content of the message. + */ + String makeCompletionMessage(String name, RemoteRunDelegate run, int code); + + /** + * Called to get the subject of the message to dispatch. + * + * @param name + * The name of the run. + * @param run + * What run are we talking about. + * @param code + * What the exit code was. + * @return The plain-text subject of the message. + */ + String makeMessageSubject(String name, RemoteRunDelegate run, int code); +}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/FactoryBean.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/FactoryBean.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/FactoryBean.java new file mode 100644 index 0000000..d38f0cc --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/FactoryBean.java @@ -0,0 +1,39 @@ +/* + */ +package org.taverna.server.master.worker; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.taverna.server.master.notification.atom.EventDAO; + +/** + * What the remote run really needs of its factory. + * + * @author Donal Fellows + */ +public interface FactoryBean { + /** + * @return Whether a run can actually be started at this time. + */ + boolean isAllowingRunsToStart(); + + /** + * @return a handle to the master Atom event feed (<i>not</i> the per-run + * feed) + */ + EventDAO getMasterEventFeed(); +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PasswordIssuer.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PasswordIssuer.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PasswordIssuer.java new file mode 100644 index 0000000..649db64 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PasswordIssuer.java @@ -0,0 +1,73 @@ +package org.taverna.server.master.worker; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A simple password issuing bean. + * + * @author Donal Fellows + */ +public class PasswordIssuer { + private static final char[] ALPHABET = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', + 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', + 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', + 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', + 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2', '3', '4', '5', '6', '7', + '8', '9', '0', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', + ',', '.', '<', '>', '/', '?', ':', ';', '-', '_', '+', '[', ']', + '{', '}', '`', '~' }; + private Log log = LogFactory.getLog("Taverna.Server.Worker"); + private SecureRandom r; + private int length; + + public PasswordIssuer() { + r = new SecureRandom(); + log.info("constructing passwords with " + r.getAlgorithm()); + setLength(8); + } + + public PasswordIssuer(String algorithm) throws NoSuchAlgorithmException { + r = SecureRandom.getInstance(algorithm); + log.info("constructing passwords with " + r.getAlgorithm()); + setLength(8); + } + + public void setLength(int length) { + this.length = length; + log.info("issued password will be " + this.length + + " symbols chosen from " + ALPHABET.length); + } + + /** + * Issue a password. + * + * @return The new password. + */ + public String issue() { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < length; i++) + sb.append(ALPHABET[r.nextInt(ALPHABET.length)]); + log.info("issued new password of length " + sb.length()); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyImpl.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyImpl.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyImpl.java new file mode 100644 index 0000000..37d5760 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyImpl.java @@ -0,0 +1,171 @@ +/* + */ +package org.taverna.server.master.worker; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static org.taverna.server.master.identity.WorkflowInternalAuthProvider.PREFIX; + +import java.net.URI; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Required; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.GrantedAuthority; +import org.springframework.security.core.context.SecurityContextHolder; +import org.taverna.server.master.common.Roles; +import org.taverna.server.master.common.Workflow; +import org.taverna.server.master.exceptions.NoCreateException; +import org.taverna.server.master.exceptions.NoDestroyException; +import org.taverna.server.master.exceptions.NoUpdateException; +import org.taverna.server.master.interfaces.Policy; +import org.taverna.server.master.interfaces.TavernaRun; +import org.taverna.server.master.interfaces.TavernaSecurityContext; +import org.taverna.server.master.utils.UsernamePrincipal; + +/** + * Basic policy implementation that allows any workflow to be instantiated by + * any user, but which does not permit users to access each others workflow + * runs. It also imposes a global limit on the number of workflow runs at once. + * + * @author Donal Fellows + */ +class PolicyImpl implements Policy { + Log log = LogFactory.getLog("Taverna.Server.Worker.Policy"); + private PolicyLimits limits; + private RunDBSupport runDB; + + @Required + public void setLimits(PolicyLimits limits) { + this.limits = limits; + } + + @Required + public void setRunDB(RunDBSupport runDB) { + this.runDB = runDB; + } + + @Override + public int getMaxRuns() { + return limits.getMaxRuns(); + } + + @Override + public Integer getMaxRuns(UsernamePrincipal user) { + return null; + } + + @Override + public int getOperatingLimit() { + return limits.getOperatingLimit(); + } + + @Override + public List<URI> listPermittedWorkflowURIs(UsernamePrincipal user) { + return limits.getPermittedWorkflowURIs(); + } + + private boolean isSelfAccess(String runId) { + Authentication auth = SecurityContextHolder.getContext() + .getAuthentication(); + boolean self = false; + String id = null; + for (GrantedAuthority a : auth.getAuthorities()) { + String aa = a.getAuthority(); + if (aa.equals(Roles.SELF)) { + self = true; + continue; + } + if (!aa.startsWith(PREFIX)) + continue; + id = aa.substring(PREFIX.length()); + } + return self && runId.equals(id); + } + + @Override + public boolean permitAccess(UsernamePrincipal user, TavernaRun run) { + String username = user.getName(); + TavernaSecurityContext context = run.getSecurityContext(); + if (context.getOwner().getName().equals(username)) { + if (log.isDebugEnabled()) + log.debug("granted access by " + user.getName() + " to " + + run.getId()); + return true; + } + if (isSelfAccess(run.getId())) { + if (log.isDebugEnabled()) + log.debug("access by workflow to itself: " + run.getId()); + return true; + } + if (log.isDebugEnabled()) + log.debug("considering access by " + user.getName() + " to " + + run.getId()); + return context.getPermittedReaders().contains(username); + } + + @Override + public void permitCreate(UsernamePrincipal user, Workflow workflow) + throws NoCreateException { + if (user == null) + throw new NoCreateException( + "anonymous workflow creation not allowed"); + if (runDB.countRuns() >= getMaxRuns()) + throw new NoCreateException("server load exceeded; please wait"); + } + + @Override + public synchronized void permitDestroy(UsernamePrincipal user, TavernaRun run) + throws NoDestroyException { + if (user == null) + throw new NoDestroyException(); + String username = user.getName(); + TavernaSecurityContext context = run.getSecurityContext(); + if (context.getOwner() == null + || context.getOwner().getName().equals(username)) + return; + if (!context.getPermittedDestroyers().contains(username)) + throw new NoDestroyException(); + } + + @Override + public void permitUpdate(UsernamePrincipal user, TavernaRun run) + throws NoUpdateException { + if (user == null) + throw new NoUpdateException( + "workflow run not owned by you and you're not granted access"); + TavernaSecurityContext context = run.getSecurityContext(); + if (context.getOwner().getName().equals(user.getName())) + return; + if (isSelfAccess(run.getId())) { + if (log.isDebugEnabled()) + log.debug("update access by workflow to itself: " + run.getId()); + return; + } + if (!context.getPermittedUpdaters().contains(user.getName())) + throw new NoUpdateException( + "workflow run not owned by you and you're not granted access"); + } + + @Override + public void setPermittedWorkflowURIs(UsernamePrincipal user, + List<URI> permitted) { + limits.setPermittedWorkflowURIs(permitted); + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyLimits.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyLimits.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyLimits.java new file mode 100644 index 0000000..43c0aa4 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyLimits.java @@ -0,0 +1,56 @@ +/* + */ +package org.taverna.server.master.worker; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.net.URI; +import java.util.List; + +import org.taverna.server.master.common.Status; + +/** + * The worker policy delegates certain limits to the state model of the + * particular worker. + * + * @author Donal Fellows + */ +public interface PolicyLimits { + /** + * @return the maximum number of extant workflow runs in any state + */ + int getMaxRuns(); + + /** + * @return the maximum number of workflow runs in the + * {@linkplain Status#Operating operating} state. + */ + int getOperatingLimit(); + + /** + * @return the list of URIs to workflows that may be used to create workflow + * runs. If empty or <tt>null</tt>, no restriction is present. + */ + List<URI> getPermittedWorkflowURIs(); + + /** + * @param permitted + * the list of URIs to workflows that may be used to create + * workflow runs. + */ + void setPermittedWorkflowURIs(List<URI> permitted); +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RemoteRunDelegate.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RemoteRunDelegate.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RemoteRunDelegate.java new file mode 100644 index 0000000..fb1ac47 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RemoteRunDelegate.java @@ -0,0 +1,980 @@ +/* + */ +package org.taverna.server.master.worker; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static java.lang.System.currentTimeMillis; +import static java.util.Calendar.MINUTE; +import static java.util.Collections.sort; +import static java.util.Collections.unmodifiableSet; +import static java.util.UUID.randomUUID; +import static org.apache.commons.io.IOUtils.closeQuietly; +import static org.apache.commons.logging.LogFactory.getLog; +import static org.taverna.server.master.worker.RemoteRunDelegate.checkBadFilename; +import static org.taverna.server.master.worker.RunConnection.NAME_LENGTH; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.PipedOutputStream; +import java.rmi.MarshalledObject; +import java.rmi.RemoteException; +import java.security.GeneralSecurityException; +import java.security.Principal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Collection; +import java.util.Comparator; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import javax.annotation.Nonnull; + +import org.apache.commons.logging.Log; +import org.taverna.server.localworker.remote.IllegalStateTransitionException; +import org.taverna.server.localworker.remote.ImplementationException; +import org.taverna.server.localworker.remote.RemoteDirectory; +import org.taverna.server.localworker.remote.RemoteDirectoryEntry; +import org.taverna.server.localworker.remote.RemoteFile; +import org.taverna.server.localworker.remote.RemoteInput; +import org.taverna.server.localworker.remote.RemoteListener; +import org.taverna.server.localworker.remote.RemoteSingleRun; +import org.taverna.server.localworker.remote.RemoteStatus; +import org.taverna.server.localworker.remote.StillWorkingOnItException; +import org.taverna.server.master.common.Status; +import org.taverna.server.master.common.Workflow; +import org.taverna.server.master.exceptions.BadPropertyValueException; +import org.taverna.server.master.exceptions.BadStateChangeException; +import org.taverna.server.master.exceptions.FilesystemAccessException; +import org.taverna.server.master.exceptions.NoListenerException; +import org.taverna.server.master.exceptions.OverloadedException; +import org.taverna.server.master.exceptions.UnknownRunException; +import org.taverna.server.master.interfaces.Directory; +import org.taverna.server.master.interfaces.DirectoryEntry; +import org.taverna.server.master.interfaces.File; +import org.taverna.server.master.interfaces.Input; +import org.taverna.server.master.interfaces.Listener; +import org.taverna.server.master.interfaces.SecurityContextFactory; +import org.taverna.server.master.interfaces.TavernaRun; +import org.taverna.server.master.interfaces.TavernaSecurityContext; +import org.taverna.server.master.utils.UsernamePrincipal; + +/** + * Bridging shim between the WebApp world and the RMI world. + * + * @author Donal Fellows + */ +@SuppressWarnings("serial") +public class RemoteRunDelegate implements TavernaRun { + private transient Log log = getLog("Taverna.Server.Worker"); + transient TavernaSecurityContext secContext; + Date creationInstant; + Workflow workflow; + Date expiry; + HashSet<String> readers; + HashSet<String> writers; + HashSet<String> destroyers; + transient String id; + transient RemoteSingleRun run; + transient RunDBSupport db; + transient FactoryBean factory; + boolean doneTransitionToFinished; + boolean generateProvenance;// FIXME expose + String name; + private static final String ELLIPSIS = "..."; + + public RemoteRunDelegate(Date creationInstant, Workflow workflow, + RemoteSingleRun rsr, int defaultLifetime, RunDBSupport db, UUID id, + boolean generateProvenance, FactoryBean factory) { + if (rsr == null) + throw new IllegalArgumentException("remote run must not be null"); + this.creationInstant = creationInstant; + this.workflow = workflow; + Calendar c = Calendar.getInstance(); + c.add(MINUTE, defaultLifetime); + this.expiry = c.getTime(); + this.run = rsr; + this.db = db; + this.generateProvenance = generateProvenance; + this.factory = factory; + try { + this.name = ""; + String ci = " " + creationInstant; + String n = workflow.getName(); + if (n.length() > NAME_LENGTH - ci.length()) + n = n.substring(0, + NAME_LENGTH - ci.length() - ELLIPSIS.length()) + + ELLIPSIS; + this.name = n + ci; + } catch (Exception e) { + // Ignore; it's just a name, not something important. + } + if (id != null) + this.id = id.toString(); + } + + RemoteRunDelegate() { + } + + /** + * Get the types of listener supported by this run. + * + * @return A list of listener type names. + * @throws RemoteException + * If anything goes wrong. + */ + public List<String> getListenerTypes() throws RemoteException { + return run.getListenerTypes(); + } + + @Override + public void addListener(Listener listener) { + if (listener instanceof ListenerDelegate) + try { + run.addListener(((ListenerDelegate) listener).getRemote()); + } catch (RemoteException e) { + log.warn("communication problem adding listener", e); + } catch (ImplementationException e) { + log.warn("implementation problem adding listener", e); + } + else + log.fatal("bad listener " + listener.getClass() + + "; not applicable remotely!"); + } + + @Override + public String getId() { + if (id == null) + id = randomUUID().toString(); + return id; + } + + /** + * Attach a listener to a workflow run and return its local delegate. + * + * @param type + * The type of listener to create. + * @param config + * The configuration of the listener. + * @return The local delegate of the listener. + * @throws NoListenerException + * If anything goes wrong. + */ + public Listener makeListener(String type, String config) + throws NoListenerException { + try { + return new ListenerDelegate(run.makeListener(type, config)); + } catch (RemoteException e) { + throw new NoListenerException("failed to make listener", e); + } + } + + @Override + public void destroy() { + try { + run.destroy(); + } catch (RemoteException | ImplementationException e) { + log.warn("failed to destroy run", e); + } + } + + @Override + public Date getExpiry() { + return new Date(expiry.getTime()); + } + + @Override + public List<Listener> getListeners() { + List<Listener> listeners = new ArrayList<>(); + try { + for (RemoteListener rl : run.getListeners()) + listeners.add(new ListenerDelegate(rl)); + } catch (RemoteException e) { + log.warn("failed to get listeners", e); + } + return listeners; + } + + @Override + public TavernaSecurityContext getSecurityContext() { + return secContext; + } + + @Override + public Status getStatus() { + try { + switch (run.getStatus()) { + case Initialized: + return Status.Initialized; + case Operating: + return Status.Operating; + case Stopped: + return Status.Stopped; + case Finished: + return Status.Finished; + } + } catch (RemoteException e) { + log.warn("problem getting remote status", e); + } + return Status.Finished; + } + + @Override + public Workflow getWorkflow() { + return workflow; + } + + @Override + public Directory getWorkingDirectory() throws FilesystemAccessException { + try { + return new DirectoryDelegate(run.getWorkingDirectory()); + } catch (Throwable e) { + if (e.getCause() != null) + e = e.getCause(); + throw new FilesystemAccessException( + "problem getting main working directory handle", e); + } + } + + @Override + public void setExpiry(Date d) { + if (d.after(new Date())) + expiry = new Date(d.getTime()); + db.flushToDisk(this); + } + + @Override + public String setStatus(Status s) throws BadStateChangeException { + try { + log.info("setting status of run " + id + " to " + s); + switch (s) { + case Initialized: + run.setStatus(RemoteStatus.Initialized); + break; + case Operating: + if (run.getStatus() == RemoteStatus.Initialized) { + if (!factory.isAllowingRunsToStart()) + throw new OverloadedException(); + secContext.conveySecurity(); + } + run.setGenerateProvenance(generateProvenance); + run.setStatus(RemoteStatus.Operating); + factory.getMasterEventFeed() + .started( + this, + "started run execution", + "The execution of run '" + getName() + + "' has started."); + break; + case Stopped: + run.setStatus(RemoteStatus.Stopped); + break; + case Finished: + run.setStatus(RemoteStatus.Finished); + break; + } + return null; + } catch (IllegalStateTransitionException e) { + throw new BadStateChangeException(e.getMessage()); + } catch (RemoteException e) { + throw new BadStateChangeException(e.getMessage(), e.getCause()); + } catch (GeneralSecurityException | IOException e) { + throw new BadStateChangeException(e.getMessage(), e); + } catch (ImplementationException e) { + if (e.getCause() != null) + throw new BadStateChangeException(e.getMessage(), e.getCause()); + throw new BadStateChangeException(e.getMessage(), e); + } catch (StillWorkingOnItException e) { + log.info("still working on setting status of run " + id + " to " + + s, e); + return e.getMessage(); + } catch (InterruptedException e) { + throw new BadStateChangeException( + "interrupted while waiting to insert notification into database"); + } + } + + static void checkBadFilename(String filename) + throws FilesystemAccessException { + if (filename.startsWith("/")) + throw new FilesystemAccessException("filename may not be absolute"); + if (Arrays.asList(filename.split("/")).contains("..")) + throw new FilesystemAccessException( + "filename may not refer to parent"); + } + + @Override + public String getInputBaclavaFile() { + try { + return run.getInputBaclavaFile(); + } catch (RemoteException e) { + log.warn("problem when fetching input baclava file", e); + return null; + } + } + + @Override + public List<Input> getInputs() { + ArrayList<Input> inputs = new ArrayList<>(); + try { + for (RemoteInput ri : run.getInputs()) + inputs.add(new RunInput(ri)); + } catch (RemoteException e) { + log.warn("problem when fetching list of workflow inputs", e); + } + return inputs; + } + + @Override + public String getOutputBaclavaFile() { + try { + return run.getOutputBaclavaFile(); + } catch (RemoteException e) { + log.warn("problem when fetching output baclava file", e); + return null; + } + } + + @Override + public Input makeInput(String name) throws BadStateChangeException { + try { + return new RunInput(run.makeInput(name)); + } catch (RemoteException e) { + throw new BadStateChangeException("failed to make input", e); + } + } + + @Override + public void setInputBaclavaFile(String filename) + throws FilesystemAccessException, BadStateChangeException { + checkBadFilename(filename); + try { + run.setInputBaclavaFile(filename); + } catch (RemoteException e) { + throw new FilesystemAccessException( + "cannot set input baclava file name", e); + } + } + + @Override + public void setOutputBaclavaFile(String filename) + throws FilesystemAccessException, BadStateChangeException { + checkBadFilename(filename); + try { + run.setOutputBaclavaFile(filename); + } catch (RemoteException e) { + throw new FilesystemAccessException( + "cannot set output baclava file name", e); + } + } + + @Override + public Date getCreationTimestamp() { + return creationInstant == null ? null : new Date( + creationInstant.getTime()); + } + + @Override + public Date getFinishTimestamp() { + try { + return run.getFinishTimestamp(); + } catch (RemoteException e) { + log.info("failed to get finish timestamp", e); + return null; + } + } + + @Override + public Date getStartTimestamp() { + try { + return run.getStartTimestamp(); + } catch (RemoteException e) { + log.info("failed to get finish timestamp", e); + return null; + } + } + + /** + * @param readers + * the readers to set + */ + public void setReaders(Set<String> readers) { + this.readers = new HashSet<>(readers); + db.flushToDisk(this); + } + + /** + * @return the readers + */ + public Set<String> getReaders() { + return readers == null ? new HashSet<String>() + : unmodifiableSet(readers); + } + + /** + * @param writers + * the writers to set + */ + public void setWriters(Set<String> writers) { + this.writers = new HashSet<>(writers); + db.flushToDisk(this); + } + + /** + * @return the writers + */ + public Set<String> getWriters() { + return writers == null ? new HashSet<String>() + : unmodifiableSet(writers); + } + + /** + * @param destroyers + * the destroyers to set + */ + public void setDestroyers(Set<String> destroyers) { + this.destroyers = new HashSet<>(destroyers); + db.flushToDisk(this); + } + + /** + * @return the destroyers + */ + public Set<String> getDestroyers() { + return destroyers == null ? new HashSet<String>() + : unmodifiableSet(destroyers); + } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + out.writeUTF(secContext.getOwner().getName()); + out.writeObject(secContext.getFactory()); + out.writeObject(new MarshalledObject<>(run)); + } + + @Override + public boolean getGenerateProvenance() { + return generateProvenance; + } + + @Override + public void setGenerateProvenance(boolean generateProvenance) { + this.generateProvenance = generateProvenance; + db.flushToDisk(this); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, + ClassNotFoundException { + in.defaultReadObject(); + if (log == null) + log = getLog("Taverna.Server.LocalWorker"); + final String creatorName = in.readUTF(); + SecurityContextFactory factory = (SecurityContextFactory) in + .readObject(); + try { + secContext = factory.create(this, + new UsernamePrincipal(creatorName)); + } catch (RuntimeException | IOException e) { + throw e; + } catch (Exception e) { + throw new SecurityContextReconstructionException(e); + } + run = ((MarshalledObject<RemoteSingleRun>) in.readObject()).get(); + } + + public void setSecurityContext(TavernaSecurityContext tavernaSecurityContext) { + secContext = tavernaSecurityContext; + } + + @Override + public String getName() { + return name; + } + + @Override + public void setName(@Nonnull String name) { + if (name.length() > RunConnection.NAME_LENGTH) + this.name = name.substring(0, RunConnection.NAME_LENGTH); + else + this.name = name; + db.flushToDisk(this); + } + + @Override + public void ping() throws UnknownRunException { + try { + run.ping(); + } catch (RemoteException e) { + throw new UnknownRunException(e); + } + } +} + +abstract class DEDelegate implements DirectoryEntry { + Log log = getLog("Taverna.Server.Worker"); + private RemoteDirectoryEntry entry; + private String name; + private String full; + private Date cacheModTime; + private long cacheQueryTime = 0L; + + DEDelegate(RemoteDirectoryEntry entry) { + this.entry = entry; + } + + @Override + public void destroy() throws FilesystemAccessException { + try { + entry.destroy(); + } catch (IOException e) { + throw new FilesystemAccessException( + "failed to delete directory entry", e); + } + } + + @Override + public String getFullName() { + if (full != null) + return full; + String n = getName(); + RemoteDirectoryEntry re = entry; + try { + while (true) { + RemoteDirectory parent = re.getContainingDirectory(); + if (parent == null) + break; + n = parent.getName() + "/" + n; + re = parent; + } + } catch (RemoteException e) { + log.warn("failed to generate full name", e); + } + return (full = n); + } + + @Override + public String getName() { + if (name == null) + try { + name = entry.getName(); + } catch (RemoteException e) { + log.error("failed to get name", e); + } + return name; + } + + @Override + public Date getModificationDate() { + if (cacheModTime == null || currentTimeMillis() - cacheQueryTime < 5000) + try { + cacheModTime = entry.getModificationDate(); + cacheQueryTime = currentTimeMillis(); + } catch (RemoteException e) { + log.error("failed to get modification time", e); + } + return cacheModTime; + } + + @Override + public int compareTo(DirectoryEntry de) { + return getFullName().compareTo(de.getFullName()); + } + + @Override + public boolean equals(Object o) { + return o != null && o instanceof DEDelegate + && getFullName().equals(((DEDelegate) o).getFullName()); + } + + @Override + public int hashCode() { + return getFullName().hashCode(); + } +} + +class DirectoryDelegate extends DEDelegate implements Directory { + RemoteDirectory rd; + + DirectoryDelegate(RemoteDirectory dir) { + super(dir); + rd = dir; + } + + @Override + public Collection<DirectoryEntry> getContents() + throws FilesystemAccessException { + ArrayList<DirectoryEntry> result = new ArrayList<>(); + try { + for (RemoteDirectoryEntry rde : rd.getContents()) { + if (rde instanceof RemoteDirectory) + result.add(new DirectoryDelegate((RemoteDirectory) rde)); + else + result.add(new FileDelegate((RemoteFile) rde)); + } + } catch (IOException e) { + throw new FilesystemAccessException( + "failed to get directory contents", e); + } + return result; + } + + @Override + public Collection<DirectoryEntry> getContentsByDate() + throws FilesystemAccessException { + ArrayList<DirectoryEntry> result = new ArrayList<>(getContents()); + sort(result, new DateComparator()); + return result; + } + + static class DateComparator implements Comparator<DirectoryEntry> { + @Override + public int compare(DirectoryEntry a, DirectoryEntry b) { + return a.getModificationDate().compareTo(b.getModificationDate()); + } + } + + @Override + public File makeEmptyFile(Principal actor, String name) + throws FilesystemAccessException { + try { + return new FileDelegate(rd.makeEmptyFile(name)); + } catch (IOException e) { + throw new FilesystemAccessException("failed to make empty file", e); + } + } + + @Override + public Directory makeSubdirectory(Principal actor, String name) + throws FilesystemAccessException { + try { + return new DirectoryDelegate(rd.makeSubdirectory(name)); + } catch (IOException e) { + throw new FilesystemAccessException("failed to make subdirectory", + e); + } + } + + @Override + public ZipStream getContentsAsZip() throws FilesystemAccessException { + ZipStream zs = new ZipStream(); + + final ZipOutputStream zos; + try { + zos = new ZipOutputStream(new PipedOutputStream(zs)); + } catch (IOException e) { + throw new FilesystemAccessException("problem building zip stream", + e); + } + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + zipDirectory(rd, null, zos); + } catch (IOException e) { + log.warn("problem when zipping directory", e); + } finally { + closeQuietly(zos); + } + } + }); + t.setDaemon(true); + t.start(); + return zs; + } + + /** + * Compresses a directory tree into a ZIP. + * + * @param dir + * The directory to compress. + * @param base + * The base name of the directory (or <tt>null</tt> if this is + * the root directory of the ZIP). + * @param zos + * Where to write the compressed data. + * @throws RemoteException + * If some kind of problem happens with the remote delegates. + * @throws IOException + * If we run into problems with reading or writing data. + */ + void zipDirectory(RemoteDirectory dir, String base, ZipOutputStream zos) + throws RemoteException, IOException { + for (RemoteDirectoryEntry rde : dir.getContents()) { + String name = rde.getName(); + if (base != null) + name = base + "/" + name; + if (rde instanceof RemoteDirectory) { + RemoteDirectory rd = (RemoteDirectory) rde; + zipDirectory(rd, name, zos); + } else { + RemoteFile rf = (RemoteFile) rde; + zos.putNextEntry(new ZipEntry(name)); + try { + int off = 0; + while (true) { + byte[] c = rf.getContents(off, 64 * 1024); + if (c == null || c.length == 0) + break; + zos.write(c); + off += c.length; + } + } finally { + zos.closeEntry(); + } + } + } + } +} + +class FileDelegate extends DEDelegate implements File { + RemoteFile rf; + + FileDelegate(RemoteFile f) { + super(f); + this.rf = f; + } + + @Override + public byte[] getContents(int offset, int length) + throws FilesystemAccessException { + try { + return rf.getContents(offset, length); + } catch (IOException e) { + throw new FilesystemAccessException("failed to read file contents", + e); + } + } + + @Override + public long getSize() throws FilesystemAccessException { + try { + return rf.getSize(); + } catch (IOException e) { + throw new FilesystemAccessException("failed to get file length", e); + } + } + + @Override + public void setContents(byte[] data) throws FilesystemAccessException { + try { + rf.setContents(data); + } catch (IOException e) { + throw new FilesystemAccessException( + "failed to write file contents", e); + } + } + + @Override + public void appendContents(byte[] data) throws FilesystemAccessException { + try { + rf.appendContents(data); + } catch (IOException e) { + throw new FilesystemAccessException( + "failed to write file contents", e); + } + } + + @Override + public void copy(File from) throws FilesystemAccessException { + FileDelegate fromFile; + try { + fromFile = (FileDelegate) from; + } catch (ClassCastException e) { + throw new FilesystemAccessException("different types of File?!"); + } + + try { + rf.copy(fromFile.rf); + } catch (Exception e) { + throw new FilesystemAccessException("failed to copy file contents", + e); + } + return; + } +} + +class ListenerDelegate implements Listener { + private Log log = getLog("Taverna.Server.Worker"); + private RemoteListener r; + String conf; + + ListenerDelegate(RemoteListener l) { + r = l; + } + + RemoteListener getRemote() { + return r; + } + + @Override + public String getConfiguration() { + try { + if (conf == null) + conf = r.getConfiguration(); + } catch (RemoteException e) { + log.warn("failed to get configuration", e); + } + return conf; + } + + @Override + public String getName() { + try { + return r.getName(); + } catch (RemoteException e) { + log.warn("failed to get name", e); + return "UNKNOWN NAME"; + } + } + + @Override + public String getProperty(String propName) throws NoListenerException { + try { + return r.getProperty(propName); + } catch (RemoteException e) { + throw new NoListenerException("no such property: " + propName, e); + } + } + + @Override + public String getType() { + try { + return r.getType(); + } catch (RemoteException e) { + log.warn("failed to get type", e); + return "UNKNOWN TYPE"; + } + } + + @Override + public String[] listProperties() { + try { + return r.listProperties(); + } catch (RemoteException e) { + log.warn("failed to list properties", e); + return new String[0]; + } + } + + @Override + public void setProperty(String propName, String value) + throws NoListenerException, BadPropertyValueException { + try { + r.setProperty(propName, value); + } catch (RemoteException e) { + log.warn("failed to set property", e); + if (e.getCause() != null + && e.getCause() instanceof RuntimeException) + throw new NoListenerException("failed to set property", + e.getCause()); + if (e.getCause() != null && e.getCause() instanceof Exception) + throw new BadPropertyValueException("failed to set property", + e.getCause()); + throw new BadPropertyValueException("failed to set property", e); + } + } +} + +class RunInput implements Input { + private final RemoteInput i; + + RunInput(RemoteInput remote) { + this.i = remote; + } + + @Override + public String getFile() { + try { + return i.getFile(); + } catch (RemoteException e) { + return null; + } + } + + @Override + public String getName() { + try { + return i.getName(); + } catch (RemoteException e) { + return null; + } + } + + @Override + public String getValue() { + try { + return i.getValue(); + } catch (RemoteException e) { + return null; + } + } + + @Override + public void setFile(String file) throws FilesystemAccessException, + BadStateChangeException { + checkBadFilename(file); + try { + i.setFile(file); + } catch (RemoteException e) { + throw new FilesystemAccessException("cannot set file for input", e); + } + } + + @Override + public void setValue(String value) throws BadStateChangeException { + try { + i.setValue(value); + } catch (RemoteException e) { + throw new BadStateChangeException(e); + } + } + + @Override + public String getDelimiter() { + try { + return i.getDelimiter(); + } catch (RemoteException e) { + return null; + } + } + + @Override + public void setDelimiter(String delimiter) throws BadStateChangeException { + try { + if (delimiter != null) + delimiter = delimiter.substring(0, 1); + i.setDelimiter(delimiter); + } catch (RemoteException e) { + throw new BadStateChangeException(e); + } + } +} + +@SuppressWarnings("serial") +class SecurityContextReconstructionException extends RuntimeException { + public SecurityContextReconstructionException(Throwable t) { + super("failed to rebuild security context", t); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunConnection.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunConnection.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunConnection.java new file mode 100644 index 0000000..0c2b1a9 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunConnection.java @@ -0,0 +1,252 @@ +/* + */ +package org.taverna.server.master.worker; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static org.taverna.server.master.worker.RunConnection.COUNT_QUERY; +import static org.taverna.server.master.worker.RunConnection.NAMES_QUERY; +import static org.taverna.server.master.worker.RunConnection.SCHEMA; +import static org.taverna.server.master.worker.RunConnection.TABLE; +import static org.taverna.server.master.worker.RunConnection.TIMEOUT_QUERY; +import static org.taverna.server.master.worker.RunConnection.UNTERMINATED_QUERY; + +import java.io.IOException; +import java.rmi.MarshalledObject; +import java.util.Date; +import java.util.HashSet; +import java.util.List; + +import javax.annotation.Nonnull; +import javax.jdo.annotations.Column; +import javax.jdo.annotations.Join; +import javax.jdo.annotations.PersistenceCapable; +import javax.jdo.annotations.Persistent; +import javax.jdo.annotations.PrimaryKey; +import javax.jdo.annotations.Queries; +import javax.jdo.annotations.Query; + +import org.taverna.server.localworker.remote.RemoteSingleRun; +import org.taverna.server.master.common.Credential; +import org.taverna.server.master.common.Trust; +import org.taverna.server.master.common.Workflow; +import org.taverna.server.master.interfaces.SecurityContextFactory; +import org.taverna.server.master.utils.UsernamePrincipal; + +/** + * The representation of the connections to the runs that actually participates + * in the persistence system. + * + * @author Donal Fellows + */ +@PersistenceCapable(table = TABLE, schema = SCHEMA) +@Queries({ + @Query(name = "count", language = "SQL", value = COUNT_QUERY, unique = "true", resultClass = Integer.class), + @Query(name = "names", language = "SQL", value = NAMES_QUERY, unique = "false", resultClass = String.class), + @Query(name = "unterminated", language = "SQL", value = UNTERMINATED_QUERY, unique = "false", resultClass = String.class), + @Query(name = "timedout", language = "SQL", value = TIMEOUT_QUERY, unique = "false", resultClass = String.class) }) +public class RunConnection { + static final String SCHEMA = "TAVERNA"; + static final String TABLE = "RUN_CONNECTION"; + private static final String FULL_NAME = SCHEMA + "." + TABLE; + static final String COUNT_QUERY = "SELECT count(*) FROM " + FULL_NAME; + static final String NAMES_QUERY = "SELECT ID FROM " + FULL_NAME; + static final String TIMEOUT_QUERY = "SELECT ID FROM " + FULL_NAME + + " WHERE expiry < CURRENT_TIMESTAMP"; + static final String UNTERMINATED_QUERY = "SELECT ID FROM " + FULL_NAME + + " WHERE doneTransitionToFinished = 0"; + static final int NAME_LENGTH = 48; + + @PrimaryKey + @Column(length = 40) + private String id; + + @Persistent(defaultFetchGroup = "true") + @Column(length = NAME_LENGTH) + private String name; + + @Persistent(defaultFetchGroup = "true") + private Date creationInstant; + + @Persistent(defaultFetchGroup = "true", serialized = "true") + @Column(jdbcType = "BLOB", sqlType = "BLOB") + private Workflow workflow; + + @Persistent(defaultFetchGroup = "true") + private Date expiry; + + @Persistent(defaultFetchGroup = "true") + @Join(table = TABLE + "_READERS", column = "ID") + private String[] readers; + + @Persistent(defaultFetchGroup = "true") + @Join(table = TABLE + "_WRITERS", column = "ID") + private String[] writers; + + @Persistent(defaultFetchGroup = "true") + @Join(table = TABLE + "_DESTROYERS", column = "ID") + private String[] destroyers; + + @Persistent(defaultFetchGroup = "true", serialized = "true") + @Column(jdbcType = "BLOB", sqlType = "BLOB") + private MarshalledObject<RemoteSingleRun> run; + + @Persistent(defaultFetchGroup = "true") + private int doneTransitionToFinished; + + @Persistent(defaultFetchGroup = "true") + private int generateProvenance; + + @Persistent(defaultFetchGroup = "true") + @Column(length = 128) + String owner; + + @Persistent(defaultFetchGroup = "true") + @Column(length = 36) + private String securityToken; + + @Persistent(defaultFetchGroup = "true", serialized = "true") + @Column(jdbcType = "BLOB", sqlType = "BLOB") + private SecurityContextFactory securityContextFactory; + @Persistent(defaultFetchGroup = "true", serialized = "true") + @Column(jdbcType = "BLOB", sqlType = "BLOB") + private Credential[] credentials; + @Persistent(defaultFetchGroup = "true", serialized = "true") + @Column(jdbcType = "BLOB", sqlType = "BLOB") + private Trust[] trust; + + private static final String[] STRING_ARY = new String[0]; + + public String getId() { + return id; + } + + public boolean isFinished() { + return doneTransitionToFinished != 0; + } + + public void setFinished(boolean finished) { + doneTransitionToFinished = (finished ? 1 : 0); + } + + public boolean isProvenanceGenerated() { + return generateProvenance != 0; + } + + public void setProvenanceGenerated(boolean generate) { + generateProvenance = (generate ? 1 : 0); + } + + /** + * Manufacture a persistent representation of the given workflow run. Must + * be called within the context of a transaction. + * + * @param rrd + * The remote delegate of the workflow run. + * @return The persistent object. + * @throws IOException + * If serialisation fails. + */ + @Nonnull + public static RunConnection toDBform(@Nonnull RemoteRunDelegate rrd) + throws IOException { + RunConnection rc = new RunConnection(); + rc.id = rrd.id; + rc.makeChanges(rrd); + return rc; + } + + private static List<String> list(String[] ary) { + if (ary == null) + return emptyList(); + return asList(ary); + } + + /** + * Get the remote run delegate for a particular persistent connection. Must + * be called within the context of a transaction. + * + * @param db + * The database facade. + * @return The delegate object. + * @throws Exception + * If anything goes wrong. + */ + @Nonnull + public RemoteRunDelegate fromDBform(@Nonnull RunDBSupport db) + throws Exception { + RemoteRunDelegate rrd = new RemoteRunDelegate(); + rrd.id = getId(); + rrd.creationInstant = creationInstant; + rrd.workflow = workflow; + rrd.expiry = expiry; + rrd.readers = new HashSet<>(list(readers)); + rrd.writers = new HashSet<>(list(writers)); + rrd.destroyers = new HashSet<>(list(destroyers)); + rrd.run = run.get(); + rrd.doneTransitionToFinished = isFinished(); + rrd.generateProvenance = isProvenanceGenerated(); + rrd.secContext = securityContextFactory.create(rrd, + new UsernamePrincipal(owner)); + ((SecurityContextDelegate)rrd.secContext).setCredentialsAndTrust(credentials,trust); + rrd.db = db; + rrd.factory = db.getFactory(); + rrd.name = name; + return rrd; + } + + /** + * Flush changes from a remote run delegate to the database. Must be called + * within the context of a transaction. + * + * @param rrd + * The remote run delegate object that has potential changes. + * @throws IOException + * If anything goes wrong in serialization. + */ + public void makeChanges(@Nonnull RemoteRunDelegate rrd) throws IOException { + // Properties that are set exactly once + if (creationInstant == null) { + creationInstant = rrd.getCreationTimestamp(); + workflow = rrd.getWorkflow(); + run = new MarshalledObject<>(rrd.run); + securityContextFactory = rrd.getSecurityContext().getFactory(); + owner = rrd.getSecurityContext().getOwner().getName(); + securityToken = ((org.taverna.server.master.worker.SecurityContextFactory) securityContextFactory) + .issueNewPassword(); + } + // Properties that are set multiple times + expiry = rrd.getExpiry(); + readers = rrd.getReaders().toArray(STRING_ARY); + writers = rrd.getWriters().toArray(STRING_ARY); + destroyers = rrd.getDestroyers().toArray(STRING_ARY); + credentials = rrd.getSecurityContext().getCredentials(); + trust = rrd.getSecurityContext().getTrusted(); + if (rrd.name.length() > NAME_LENGTH) + this.name = rrd.name.substring(0, NAME_LENGTH); + else + this.name = rrd.name; + setFinished(rrd.doneTransitionToFinished); + setProvenanceGenerated(rrd.generateProvenance); + } + + public String getSecurityToken() { + return securityToken; + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDBSupport.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDBSupport.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDBSupport.java new file mode 100644 index 0000000..5fa96b8 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDBSupport.java @@ -0,0 +1,96 @@ +/* + */ +package org.taverna.server.master.worker; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.List; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import org.taverna.server.master.notification.NotificationEngine; + +/** + * The interface to the database of runs. + * + * @author Donal Fellows + */ +public interface RunDBSupport { + /** + * Scan each run to see if it has finished yet and issue registered + * notifications if it has. + */ + void checkForFinishNow(); + + /** + * Remove currently-expired runs from this database. + */ + void cleanNow(); + + /** + * How many runs are stored in the database. + * + * @return The current size of the run table. + */ + int countRuns(); + + /** + * Ensure that a run gets persisted in the database. It is assumed that the + * value is already in there. + * + * @param run + * The run to persist. + */ + void flushToDisk(@Nonnull RemoteRunDelegate run); + + /** + * Select an arbitrary representative run. + * + * @return The selected run. + * @throws Exception + * If anything goes wrong. + */ + @Nullable + RemoteRunDelegate pickArbitraryRun() throws Exception; + + /** + * Get a list of all the run names. + * + * @return The names (i.e., UUIDs) of all the runs. + */ + @Nonnull + List<String> listRunNames(); + + /** + * @param notificationEngine + * A reference to the notification fabric bean. + */ + void setNotificationEngine(NotificationEngine notificationEngine); + + /** + * @param notifier + * A reference to the bean that creates messages about workflow + * run termination. + */ + void setNotifier(CompletionNotifier notifier); + + /** + * @return A reference to the actual factory for remote runs. + */ + FactoryBean getFactory(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabase.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabase.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabase.java new file mode 100644 index 0000000..65aec70 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabase.java @@ -0,0 +1,324 @@ +/* + */ +package org.taverna.server.master.worker; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static java.lang.Integer.parseInt; +import static java.util.UUID.randomUUID; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; + +import javax.annotation.Nullable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Required; +import org.taverna.server.master.common.Status; +import org.taverna.server.master.exceptions.UnknownRunException; +import org.taverna.server.master.interfaces.Listener; +import org.taverna.server.master.interfaces.Policy; +import org.taverna.server.master.interfaces.RunStore; +import org.taverna.server.master.interfaces.TavernaRun; +import org.taverna.server.master.notification.NotificationEngine; +import org.taverna.server.master.notification.NotificationEngine.Message; +import org.taverna.server.master.utils.UsernamePrincipal; + +/** + * The main facade bean that interfaces to the database of runs. + * + * @author Donal Fellows + */ +public class RunDatabase implements RunStore, RunDBSupport { + private Log log = LogFactory.getLog("Taverna.Server.Worker.RunDB"); + RunDatabaseDAO dao; + CompletionNotifier backupNotifier; + Map<String, CompletionNotifier> typedNotifiers; + private NotificationEngine notificationEngine; + @Autowired + private FactoryBean factory; + private Map<String, TavernaRun> cache = new HashMap<>(); + + @Override + @Required + public void setNotifier(CompletionNotifier n) { + backupNotifier = n; + } + + public void setTypeNotifiers(List<CompletionNotifier> notifiers) { + typedNotifiers = new HashMap<>(); + for (CompletionNotifier n : notifiers) + typedNotifiers.put(n.getName(), n); + } + + @Required + @Override + public void setNotificationEngine(NotificationEngine notificationEngine) { + this.notificationEngine = notificationEngine; + } + + @Required + public void setDao(RunDatabaseDAO dao) { + this.dao = dao; + } + + @Override + public void checkForFinishNow() { + /* + * Get which runs are actually newly finished; this requires getting the + * candidates from the database and *then* doing the expensive requests + * to the back end to find out the status. + */ + Map<String, RemoteRunDelegate> notifiable = new HashMap<>(); + for (RemoteRunDelegate p : dao.getPotentiallyNotifiable()) + if (p.getStatus() == Status.Finished) + notifiable.put(p.getId(), p); + + // Check if there's nothing more to do + if (notifiable.isEmpty()) + return; + + /* + * Tell the database about the ones we've got. + */ + dao.markFinished(notifiable.keySet()); + + /* + * Send out the notifications. The notification addresses are stored in + * the back-end engine, so this is *another* thing that can take time. + */ + for (RemoteRunDelegate rrd : notifiable.values()) + for (Listener l : rrd.getListeners()) + if (l.getName().equals("io")) { + try { + notifyFinished(rrd.id, l, rrd); + } catch (Exception e) { + log.warn("failed to do notification of completion", e); + } + break; + } + } + + @Override + public void cleanNow() { + List<String> cleaned; + try { + cleaned = dao.doClean(); + } catch (Exception e) { + log.warn("failure during deletion of expired runs", e); + return; + } + synchronized (cache) { + for (String id : cleaned) + cache.remove(id); + } + } + + @Override + public int countRuns() { + return dao.countRuns(); + } + + @Override + public void flushToDisk(RemoteRunDelegate run) { + try { + dao.flushToDisk(run); + } catch (IOException e) { + throw new RuntimeException( + "unexpected problem when persisting run record in database", + e); + } + } + + @Override + public RemoteRunDelegate pickArbitraryRun() throws Exception { + return dao.pickArbitraryRun(); + } + + @Override + public List<String> listRunNames() { + return dao.listRunNames(); + } + + @Nullable + private TavernaRun get(String uuid) { + TavernaRun run = null; + synchronized (cache) { + run = cache.get(uuid); + } + try { + if (run != null) + run.ping(); + } catch (UnknownRunException e) { + if (log.isDebugEnabled()) + log.debug("stale mapping in cache?", e); + // Don't need to flush the cache; this happens when cleaning anyway + run = null; + } + if (run == null) + run = dao.get(uuid); + return run; + } + + @Override + public TavernaRun getRun(UsernamePrincipal user, Policy p, String uuid) + throws UnknownRunException { + // Check first to see if the 'uuid' actually looks like a UUID; if + // not, throw it out immediately without logging an exception. + try { + UUID.fromString(uuid); + } catch (IllegalArgumentException e) { + if (log.isDebugEnabled()) + log.debug("run ID does not look like UUID; rejecting..."); + throw new UnknownRunException(); + } + TavernaRun run = get(uuid); + if (run != null && (user == null || p.permitAccess(user, run))) + return run; + throw new UnknownRunException(); + } + + @Override + public TavernaRun getRun(String uuid) throws UnknownRunException { + TavernaRun run = get(uuid); + if (run != null) + return run; + throw new UnknownRunException(); + } + + @Override + public Map<String, TavernaRun> listRuns(UsernamePrincipal user, Policy p) { + synchronized (cache) { + Map<String, TavernaRun> cached = new HashMap<>(); + for (Entry<String, TavernaRun> e : cache.entrySet()) { + TavernaRun r = e.getValue(); + if (p.permitAccess(user, r)) + cached.put(e.getKey(), r); + } + if (!cached.isEmpty()) + return cached; + } + return dao.listRuns(user, p); + } + + private void logLength(String message, Object obj) { + if (!log.isDebugEnabled()) + return; + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(obj); + } + log.debug(message + ": " + baos.size()); + } catch (Exception e) { + log.warn("oops", e); + } + } + + @Override + public String registerRun(TavernaRun run) { + if (!(run instanceof RemoteRunDelegate)) + throw new IllegalArgumentException( + "run must be created by localworker package"); + RemoteRunDelegate rrd = (RemoteRunDelegate) run; + if (rrd.id == null) + rrd.id = randomUUID().toString(); + logLength("RemoteRunDelegate serialized length", rrd); + try { + dao.persistRun(rrd); + } catch (IOException e) { + throw new RuntimeException( + "unexpected problem when persisting run record in database", + e); + } + synchronized (cache) { + cache.put(rrd.getId(), run); + } + return rrd.getId(); + } + + @Override + public void unregisterRun(String uuid) { + try { + if (dao.unpersistRun(uuid)) + synchronized (cache) { + cache.remove(uuid); + } + } catch (RuntimeException e) { + if (log.isDebugEnabled()) + log.debug("problem persisting the deletion of the run " + uuid, + e); + } + } + + /** + * Process the event that a run has finished. + * + * @param name + * The name of the run. + * @param io + * The io listener of the run (used to get information about the + * run). + * @param run + * The handle to the run. + * @throws Exception + * If anything goes wrong. + */ + private void notifyFinished(final String name, Listener io, + final RemoteRunDelegate run) throws Exception { + String to = io.getProperty("notificationAddress"); + final int code; + try { + code = parseInt(io.getProperty("exitcode")); + } catch (NumberFormatException nfe) { + // Ignore; not much we can do here... + return; + } + + notificationEngine.dispatchMessage(run, to, new Message() { + private CompletionNotifier getNotifier(String type) { + CompletionNotifier n = typedNotifiers.get(type); + if (n == null) + n = backupNotifier; + return n; + } + + @Override + public String getContent(String type) { + return getNotifier(type).makeCompletionMessage(name, run, code); + } + + @Override + public String getTitle(String type) { + return getNotifier(type).makeMessageSubject(name, run, code); + } + }); + } + + @Override + public FactoryBean getFactory() { + return factory; + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabaseDAO.java ---------------------------------------------------------------------- diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabaseDAO.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabaseDAO.java new file mode 100644 index 0000000..1c75d22 --- /dev/null +++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabaseDAO.java @@ -0,0 +1,323 @@ +/* + */ +package org.taverna.server.master.worker; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static org.taverna.server.master.worker.RunConnection.toDBform; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.jdo.annotations.PersistenceAware; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Required; +import org.taverna.server.master.interfaces.Policy; +import org.taverna.server.master.interfaces.TavernaRun; +import org.taverna.server.master.utils.CallTimeLogger.PerfLogged; +import org.taverna.server.master.utils.JDOSupport; +import org.taverna.server.master.utils.UsernamePrincipal; + +/** + * This handles storing runs, interfacing with the underlying state engine as + * necessary. + * + * @author Donal Fellows + */ +@PersistenceAware +public class RunDatabaseDAO extends JDOSupport<RunConnection> { + public RunDatabaseDAO() { + super(RunConnection.class); + } + + private Log log = LogFactory.getLog("Taverna.Server.Worker.RunDB"); + private RunDatabase facade; + + @Required + public void setFacade(RunDatabase facade) { + this.facade = facade; + } + + // -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + + @SuppressWarnings("unchecked") + private List<String> names() { + if (log.isDebugEnabled()) + log.debug("fetching all run names"); + return (List<String>) namedQuery("names").execute(); + } + + /** + * @return The number of workflow runs in the database. + */ + @WithinSingleTransaction + public int countRuns() { + if (log.isDebugEnabled()) + log.debug("counting the number of runs"); + return count(); + } + + private Integer count() { + return (Integer) namedQuery("count").execute(); + } + + @SuppressWarnings("unchecked") + private List<String> timedout() { + return (List<String>) namedQuery("timedout").execute(); + } + + @SuppressWarnings("unchecked") + private List<String> unterminated() { + return (List<String>) namedQuery("unterminated").execute(); + } + + @Nullable + private RunConnection pickRun(@Nonnull String name) { + if (log.isDebugEnabled()) + log.debug("fetching the run called " + name); + try { + RunConnection rc = getById(name); + if (rc == null) + log.warn("no result for " + name); + return rc; + } catch (RuntimeException e) { + log.warn("problem in fetch", e); + throw e; + } + } + + @Nullable + @WithinSingleTransaction + public String getSecurityToken(@Nonnull String name) { + RunConnection rc = getById(name); + if (rc == null) + return null; + return rc.getSecurityToken(); + } + + private void persist(@Nonnull RemoteRunDelegate rrd) throws IOException { + persist(toDBform(rrd)); + } + + @Nonnull + private List<RunConnection> allRuns() { + try { + List<RunConnection> rcs = new ArrayList<>(); + List<String> names = names(); + for (String id : names) { + try { + if (id != null) + rcs.add(pickRun(id)); + } catch (RuntimeException e) { + continue; + } + } + return rcs; + } catch (RuntimeException e) { + log.warn("problem in fetch", e); + throw e; + } + } + + // -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + + /** + * Obtain a workflow run handle. + * + * @param name + * The identifier of the run. + * @return The run handle, or <tt>null</tt> if there is no such run. + */ + @Nullable + @WithinSingleTransaction + public TavernaRun get(String name) { + try { + RunConnection rc = pickRun(name); + return (rc == null) ? null : rc.fromDBform(facade); + } catch (Exception e) { + return null; + } + } + + /** + * Get the runs that a user can read things from. + * + * @param user + * Who is asking? + * @param p + * The policy that determines what they can see. + * @return A mapping from run IDs to run handles. + */ + @Nonnull + @WithinSingleTransaction + public Map<String, TavernaRun> listRuns(UsernamePrincipal user, Policy p) { + Map<String, TavernaRun> result = new HashMap<>(); + for (String id : names()) + try { + RemoteRunDelegate rrd = pickRun(id).fromDBform(facade); + if (p.permitAccess(user, rrd)) + result.put(id, rrd); + } catch (Exception e) { + continue; + } + return result; + } + + /** + * @return A list of the IDs for all workflow runs. + */ + @Nonnull + @WithinSingleTransaction + public List<String> listRunNames() { + List<String> runNames = new ArrayList<>(); + for (RunConnection rc : allRuns()) + if (rc.getId() != null) + runNames.add(rc.getId()); + return runNames; + } + + /** + * @return An arbitrary, representative workflow run. + * @throws Exception + * If anything goes wrong. + */ + @Nullable + @WithinSingleTransaction + public RemoteRunDelegate pickArbitraryRun() throws Exception { + for (RunConnection rc : allRuns()) { + if (rc.getId() == null) + continue; + return rc.fromDBform(facade); + } + return null; + } + + /** + * Make a workflow run persistent. Must only be called once per workflow + * run. + * + * @param rrd + * The workflow run to persist. + * @throws IOException + * If anything goes wrong with serialisation of the run. + */ + @WithinSingleTransaction + public void persistRun(@Nonnull RemoteRunDelegate rrd) throws IOException { + persist(rrd); + } + + /** + * Stop a workflow run from being persistent. + * + * @param name + * The ID of the run. + * @return Whether a deletion happened. + */ + @WithinSingleTransaction + public boolean unpersistRun(String name) { + RunConnection rc = pickRun(name); + if (rc != null) + delete(rc); + return rc != null; + } + + /** + * Ensure that the given workflow run is synchronized with the database. + * + * @param run + * The run to synchronise. + * @throws IOException + * If serialization of anything fails. + */ + @WithinSingleTransaction + public void flushToDisk(@Nonnull RemoteRunDelegate run) throws IOException { + getById(run.id).makeChanges(run); + } + + /** + * Remove all workflow runs that have expired. + * + * @return The ids of the deleted runs. + */ + @Nonnull + @PerfLogged + @WithinSingleTransaction + public List<String> doClean() { + if (log.isDebugEnabled()) + log.debug("deleting runs that timed out before " + new Date()); + List<String> toDelete = timedout(); + if (log.isDebugEnabled()) + log.debug("found " + toDelete.size() + " runs to delete"); + for (String id : toDelete) { + RunConnection rc = getById(id); + try { + rc.fromDBform(facade).run.destroy(); + } catch (Exception e) { + if (log.isDebugEnabled()) + log.debug("failed to delete execution resource for " + id, + e); + } + delete(rc); + } + return toDelete; + } + + /** + * @return A list of workflow runs that are candidates for doing + * notification of termination. + */ + @Nonnull + @PerfLogged + @WithinSingleTransaction + public List<RemoteRunDelegate> getPotentiallyNotifiable() { + List<RemoteRunDelegate> toNotify = new ArrayList<>(); + for (String id : unterminated()) + try { + RunConnection rc = getById(id); + toNotify.add(rc.fromDBform(facade)); + } catch (Exception e) { + log.warn("failed to fetch connection token" + + "for notification of completion check", e); + } + return toNotify; + } + + @PerfLogged + @WithinSingleTransaction + public void markFinished(@Nonnull Set<String> terminated) { + for (String id : terminated) { + RunConnection rc = getById(id); + if (rc == null) + continue; + try { + rc.fromDBform(facade).doneTransitionToFinished = true; + rc.setFinished(true); + } catch (Exception e) { + log.warn("failed to note termination", e); + } + } + } +}
