SshToolInvocation
Project: http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/commit/7d28decd Tree: http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/tree/7d28decd Diff: http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/diff/7d28decd Branch: refs/heads/docker Commit: 7d28decd969c7d0bd3da6bd9825dde03b735ea7b Parents: 3c48b20 Author: Stian Soiland-Reyes <[email protected]> Authored: Wed May 4 01:06:14 2016 +0100 Committer: Stian Soiland-Reyes <[email protected]> Committed: Wed May 4 01:06:14 2016 +0100 ---------------------------------------------------------------------- .../ssh/ExternalToolSshInvocationMechanism.java | 2 +- .../externaltool/ssh/SshInvocationCreator.java | 6 +- .../ssh/SshInvocationPersister.java | 6 +- .../externaltool/ssh/SshMechanismCreator.java | 2 +- .../externaltool/ssh/SshToolInvocation.java | 560 +++++++++++++++++++ .../externaltool/ssh/SshUseCaseInvocation.java | 560 ------------------- 6 files changed, 568 insertions(+), 568 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/blob/7d28decd/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/ExternalToolSshInvocationMechanism.java ---------------------------------------------------------------------- diff --git a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/ExternalToolSshInvocationMechanism.java b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/ExternalToolSshInvocationMechanism.java index 94cc466..b92c62b 100644 --- a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/ExternalToolSshInvocationMechanism.java +++ b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/ExternalToolSshInvocationMechanism.java @@ -39,7 +39,7 @@ public class ExternalToolSshInvocationMechanism extends InvocationMechanism { */ @Override public String getType() { - return SshUseCaseInvocation.SSH_USE_CASE_INVOCATION_TYPE; + return SshToolInvocation.SSH_USE_CASE_INVOCATION_TYPE; } /** http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/blob/7d28decd/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationCreator.java ---------------------------------------------------------------------- diff --git a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationCreator.java b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationCreator.java index f0b1e83..8f1950e 100644 --- a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationCreator.java +++ b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationCreator.java @@ -54,16 +54,16 @@ public final class SshInvocationCreator implements InvocationCreator { @Override public boolean canHandle(String mechanismType) { - return mechanismType.equals(SshUseCaseInvocation.SSH_USE_CASE_INVOCATION_TYPE); + return mechanismType.equals(SshToolInvocation.SSH_USE_CASE_INVOCATION_TYPE); } @Override public ToolInvocation convert(InvocationMechanism m, UseCaseDescription description, Map<String, T2Reference> data, ReferenceService referenceService) { ExternalToolSshInvocationMechanism mechanism = (ExternalToolSshInvocationMechanism) m; - SshUseCaseInvocation result = null; + SshToolInvocation result = null; try { SshNode chosenNode = chooseNode(mechanism.getNodes(), data, referenceService); - result = new SshUseCaseInvocation(description, chosenNode, new RetrieveLoginFromTaverna(new SshUrl(chosenNode).toString(), credentialManager), credentialManager); + result = new SshToolInvocation(description, chosenNode, new RetrieveLoginFromTaverna(new SshUrl(chosenNode).toString(), credentialManager), credentialManager); } catch (JSchException e) { logger.error("Null invocation", e); } catch (SftpException e) { http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/blob/7d28decd/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationPersister.java ---------------------------------------------------------------------- diff --git a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationPersister.java b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationPersister.java index bad5c2e..93bb3e9 100644 --- a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationPersister.java +++ b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationPersister.java @@ -62,7 +62,7 @@ public class SshInvocationPersister extends InvocationPersister { */ @Override public void load(File directory) { - SshUseCaseInvocation.load(directory); + SshToolInvocation.load(directory); } /* (non-Javadoc) @@ -70,13 +70,13 @@ public class SshInvocationPersister extends InvocationPersister { */ @Override public void persist(File directory) { - SshUseCaseInvocation.persist(directory); + SshToolInvocation.persist(directory); } @Override public void deleteRun(String runId) { try { - SshUseCaseInvocation.cleanup(runId, credentialManager); + SshToolInvocation.cleanup(runId, credentialManager); } catch (InvocationException e) { logger.error(e); } http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/blob/7d28decd/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshMechanismCreator.java ---------------------------------------------------------------------- diff --git a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshMechanismCreator.java b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshMechanismCreator.java index 097f8db..c064947 100644 --- a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshMechanismCreator.java +++ b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshMechanismCreator.java @@ -41,7 +41,7 @@ public class SshMechanismCreator extends MechanismCreator { */ @Override public boolean canHandle(String mechanismType) { - return mechanismType.equals(SshUseCaseInvocation.SSH_USE_CASE_INVOCATION_TYPE); + return mechanismType.equals(SshToolInvocation.SSH_USE_CASE_INVOCATION_TYPE); } http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/blob/7d28decd/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshToolInvocation.java ---------------------------------------------------------------------- diff --git a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshToolInvocation.java b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshToolInvocation.java new file mode 100755 index 0000000..edf9698 --- /dev/null +++ b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshToolInvocation.java @@ -0,0 +1,560 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.taverna.activities.externaltool.ssh; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.Vector; +import java.util.regex.Matcher; + +import org.apache.taverna.activities.externaltool.RetrieveLoginFromTaverna; +import org.apache.taverna.activities.externaltool.desc.ScriptInput; +import org.apache.taverna.activities.externaltool.desc.ScriptOutput; +import org.apache.taverna.activities.externaltool.desc.UseCaseDescription; +import org.apache.taverna.activities.externaltool.invocation.AskUserForPw; +import org.apache.taverna.activities.externaltool.invocation.InvocationException; +import org.apache.taverna.activities.externaltool.invocation.ToolInvocation; +import org.apache.taverna.reference.AbstractExternalReference; +import org.apache.taverna.reference.ErrorDocument; +import org.apache.taverna.reference.ErrorDocumentServiceException; +import org.apache.taverna.reference.ExternalReferenceSPI; +import org.apache.taverna.reference.Identified; +import org.apache.taverna.reference.ReferenceService; +import org.apache.taverna.reference.ReferenceSet; +import org.apache.taverna.reference.ReferencedDataNature; +import org.apache.taverna.reference.T2Reference; +import org.apache.taverna.security.credentialmanager.CredentialManager; + +import org.apache.log4j.Logger; + +import com.jcraft.jsch.ChannelExec; +import com.jcraft.jsch.ChannelSftp; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; +import com.jcraft.jsch.SftpException; +import com.jcraft.jsch.ChannelSftp.LsEntry; + +/** + * The job is executed by connecting to a worker pc using ssh, i.e. not via the + * grid. + * + * @author Hajo Krabbenhoeft + */ +public class SshToolInvocation extends ToolInvocation { + + private static Logger logger = Logger.getLogger(SshToolInvocation.class); + + private SshUrl location = null; + + private InputStream stdInputStream = null; + + public static final String SSH_USE_CASE_INVOCATION_TYPE = "D0A4CDEB-DD10-4A8E-A49C-8871003083D8"; + private String tmpname; + private final SshNode workerNode; + private final AskUserForPw askUserForPw; + + private ChannelExec running; + + private List<String> precedingCommands = new ArrayList<String>(); + + private final ByteArrayOutputStream stdout_buf = new ByteArrayOutputStream(); + private final ByteArrayOutputStream stderr_buf = new ByteArrayOutputStream(); + + private static Map<String, Object> nodeLock = Collections + .synchronizedMap(new HashMap<String, Object>()); + + private static Map<String, Set<SshUrl>> runIdToTempDir = Collections + .synchronizedMap(new HashMap<String, Set<SshUrl>>()); + + private static String SSH_INVOCATION_FILE = "sshInvocations"; + + private final CredentialManager credentialManager; + + public static String test(final SshNode workerNode, + final AskUserForPw askUserForPw) { + try { + Session sshSession = SshPool + .getSshSession(workerNode, askUserForPw); + + ChannelSftp sftpTest = (ChannelSftp) sshSession.openChannel("sftp"); + sftpTest.connect(); + sftpTest.cd(workerNode.getDirectory()); + sftpTest.disconnect(); + sshSession.disconnect(); + } catch (JSchException e) { + return e.toString(); + } catch (SftpException e) { + return e.toString(); + } + return null; + } + + public SshToolInvocation(UseCaseDescription desc, SshNode workerNodeA, + AskUserForPw askUserForPwA, CredentialManager credentialManager) + throws JSchException, SftpException { + this.workerNode = workerNodeA; + this.credentialManager = credentialManager; + + setRetrieveData(workerNodeA.isRetrieveData()); + this.askUserForPw = askUserForPwA; + usecase = desc; + + ChannelSftp sftp = SshPool.getSftpPutChannel(workerNode, askUserForPw); + synchronized (getNodeLock(workerNode)) { + + logger.info("Changing remote directory to " + + workerNode.getDirectory()); + sftp.cd(workerNode.getDirectory()); + Random rnd = new Random(); + while (true) { + tmpname = "usecase" + rnd.nextLong(); + try { + sftp.lstat(workerNode.getDirectory() + tmpname); + continue; + } catch (Exception e) { + // file seems to not exist :) + } + sftp.mkdir(workerNode.getDirectory() + tmpname); + sftp.cd(workerNode.getDirectory() + tmpname); + break; + } + } + } + + private static void recursiveDelete(ChannelSftp sftp, String path) + throws SftpException, JSchException { + Vector<?> entries = sftp.ls(path); + for (Object object : entries) { + LsEntry entry = (LsEntry) object; + if (entry.getFilename().equals(".") + || entry.getFilename().equals("..")) { + continue; + } + if (entry.getAttrs().isDir()) { + recursiveDelete(sftp, path + entry.getFilename() + "/"); + } else { + sftp.rm(path + entry.getFilename()); + } + } + sftp.rmdir(path); + } + + private static void deleteDirectory(SshUrl directory, + CredentialManager credentialManager) throws InvocationException { + URI uri; + try { + uri = new URI(directory.toString()); + + ChannelSftp sftp; + SshNode workerNode; + String fullPath = uri.getPath(); + String path = fullPath.substring(0, fullPath.lastIndexOf("/")); + String tempDir = fullPath.substring(fullPath.lastIndexOf("/")); + try { + workerNode = SshNodeFactory.getInstance().getSshNode( + uri.getHost(), uri.getPort(), path); + + sftp = SshPool.getSftpPutChannel(workerNode, + new RetrieveLoginFromTaverna(workerNode.getUrl() + .toString(), credentialManager)); + } catch (JSchException e) { + throw new InvocationException(e); + } + synchronized (getNodeLock(workerNode)) { + try { + sftp.cd(path); + recursiveDelete(sftp, path + "/" + tempDir + "/"); + } catch (SftpException e) { + throw new InvocationException(e); + } catch (JSchException e) { + throw new InvocationException(e); + } + } + } catch (URISyntaxException e1) { + throw new InvocationException(e1); + } + } + + public static void cleanup(String runId, CredentialManager credentialManager) + throws InvocationException { + Set<SshUrl> tempDirectories = runIdToTempDir.get(runId); + if (tempDirectories != null) { + for (SshUrl tempUrl : tempDirectories) { + deleteDirectory(tempUrl, credentialManager); + } + runIdToTempDir.remove(runId); + } + } + + @Override + protected void submit_generate_job_inner() throws InvocationException { + tags.put("uniqueID", "" + getSubmissionID()); + String command = usecase.getCommand(); + for (String cur : tags.keySet()) { + command = command.replaceAll("\\Q%%" + cur + "%%\\E", + Matcher.quoteReplacement(tags.get(cur))); + } + String fullCommand = "cd " + workerNode.getDirectory() + tmpname; + for (String preceding : precedingCommands) { + fullCommand += " && " + preceding; + } + fullCommand += " && " + command; + + logger.info("Full command is " + fullCommand); + + try { + running = SshPool.openExecChannel(workerNode, askUserForPw); + running.setCommand(fullCommand); + running.setOutputStream(stdout_buf); + running.setErrStream(stderr_buf); + if (stdInputStream != null) { + running.setInputStream(stdInputStream); + } + running.connect(); + } catch (JSchException e) { + throw new InvocationException(e); + } + + } + + @Override + public HashMap<String, Object> submit_wait_fetch_results( + ReferenceService referenceService) throws InvocationException { + while (!running.isClosed()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new InvocationException("Invocation interrupted:" + + e.getMessage()); + } + } + + int exitcode = running.getExitStatus(); + if (!usecase.getValidReturnCodes().contains(exitcode)) { + try { + throw new InvocationException("Invalid exit code " + exitcode + + ":" + stderr_buf.toString("US-ASCII")); + } catch (UnsupportedEncodingException e) { + throw new InvocationException("Invalid exit code " + exitcode + + ":" + stderr_buf.toString()); + } + } + + HashMap<String, Object> results = new HashMap<String, Object>(); + + results.put("STDOUT", stdout_buf.toByteArray()); + results.put("STDERR", stderr_buf.toByteArray()); + try { + stdout_buf.close(); + stderr_buf.close(); + } catch (IOException e2) { + throw new InvocationException(e2); + } + + try { + ChannelSftp sftp = SshPool.getSftpPutChannel(workerNode, + askUserForPw); + synchronized (getNodeLock(workerNode)) { + for (Map.Entry<String, ScriptOutput> cur : usecase.getOutputs() + .entrySet()) { + ScriptOutput scriptOutput = cur.getValue(); + String fullPath = workerNode.getDirectory() + tmpname + "/" + + scriptOutput.getPath(); + try { + if (sftp.stat(fullPath) != null) { + SshUrl url = new SshUrl(workerNode); + url.setSubDirectory(tmpname); + url.setFileName(scriptOutput.getPath()); + if (scriptOutput.isBinary()) { + url.setDataNature(ReferencedDataNature.BINARY); + } else { + url.setDataNature(ReferencedDataNature.TEXT); + url.setCharset("UTF-8"); + } + if (isRetrieveData()) { + SshReference urlRef = new SshReference(url); + InputStream is = urlRef.openStream(null); + AbstractExternalReference ref; + if (scriptOutput.isBinary()) { + ref = inlineByteArrayReferenceBuilder + .createReference(is, null); + } else { + ref = inlineStringReferenceBuilder + .createReference(is, null); + } + try { + is.close(); + } catch (IOException e) { + throw new InvocationException(e); + } + results.put(cur.getKey(), ref); + } else { + results.put(cur.getKey(), url); + } + } else { + ErrorDocument ed = referenceService + .getErrorDocumentService().registerError( + "No result for " + cur.getKey(), 0, + getContext()); + results.put(cur.getKey(), ed); + } + } catch (SftpException e) { + ErrorDocument ed = referenceService + .getErrorDocumentService().registerError( + "No result for " + cur.getKey(), 0, + getContext()); + results.put(cur.getKey(), ed); + + } + } + } + } catch (JSchException e1) { + throw new InvocationException(e1); + } catch (ErrorDocumentServiceException e) { + throw new InvocationException(e); + } + + if (running != null) { + running.disconnect(); + } + if (stdInputStream != null) { + try { + stdInputStream.close(); + } catch (IOException e) { + throw new InvocationException(e); + } + } + + if (isRetrieveData()) { + forgetRun(); + deleteDirectory(location, credentialManager); + + } + return results; + } + + @Override + public String setOneInput(ReferenceService referenceService, + T2Reference t2Reference, ScriptInput input) + throws InvocationException { + String target = null; + String remoteName = null; + if (input.isFile()) { + remoteName = input.getTag(); + } else if (input.isTempFile()) { + remoteName = "tempfile." + (nTempFiles++) + ".tmp"; + + } + if (input.isFile() || input.isTempFile()) { + SshReference sshRef = getAsSshReference(referenceService, + t2Reference, workerNode); + target = workerNode.getDirectory() + tmpname + "/" + remoteName; + logger.info("Target is " + target); + if (sshRef != null) { + if (!input.isForceCopy()) { + String linkCommand = workerNode.getLinkCommand(); + if (linkCommand != null) { + String actualLinkCommand = getActualOsCommand( + linkCommand, sshRef.getFullPath(), remoteName, + target); + precedingCommands.add(actualLinkCommand); + return target; + + } + } + String copyCommand = workerNode.getCopyCommand(); + if (copyCommand != null) { + String actualCopyCommand = getActualOsCommand(copyCommand, + sshRef.getFullPath(), remoteName, target); + precedingCommands.add(actualCopyCommand); + return target; + } + } + try { + ChannelSftp sftp = SshPool.getSftpPutChannel(workerNode, + askUserForPw); + synchronized (getNodeLock(workerNode)) { + InputStream r = getAsStream(referenceService, t2Reference); + sftp.put(r, target); + r.close(); + } + } catch (SftpException e) { + throw new InvocationException(e); + } catch (JSchException e) { + throw new InvocationException(e); + } catch (IOException e) { + throw new InvocationException(e); + } + return target; + } else { + String value = (String) referenceService.renderIdentifier( + t2Reference, String.class, this.getContext()); + return value; + + } + } + + public SshReference getAsSshReference(ReferenceService referenceService, + T2Reference t2Reference, SshNode workerNode) { + Identified identified = referenceService.resolveIdentifier(t2Reference, + null, null); + if (identified instanceof ReferenceSet) { + for (ExternalReferenceSPI ref : ((ReferenceSet) identified) + .getExternalReferences()) { + if (ref instanceof SshReference) { + SshReference sshRef = (SshReference) ref; + if (sshRef.getHost().equals(workerNode.getHost())) { + return sshRef; + } + } + } + } + return null; + } + + private static Object getNodeLock(final SshNode node) { + return getNodeLock(node.getHost()); + } + + private static synchronized Object getNodeLock(String hostName) { + if (!nodeLock.containsKey(hostName)) { + nodeLock.put(hostName, new Object()); + } + return nodeLock.get(hostName); + } + + @Override + public void setStdIn(ReferenceService referenceService, + T2Reference t2Reference) { + stdInputStream = new BufferedInputStream(getAsStream(referenceService, + t2Reference)); + } + + @Override + public void rememberRun(String runId) { + this.setRunId(runId); + Set<SshUrl> directories = runIdToTempDir.get(runId); + if (directories == null) { + directories = Collections.synchronizedSet(new HashSet<SshUrl>()); + runIdToTempDir.put(runId, directories); + } + location = new SshUrl(workerNode); + location.setSubDirectory(tmpname); + directories.add(location); + } + + private void forgetRun() { + Set<SshUrl> directories = runIdToTempDir.get(getRunId()); + directories.remove(location); + } + + public static void load(File directory) { + File invocationsFile = new File(directory, SSH_INVOCATION_FILE); + if (!invocationsFile.exists()) { + return; + } + BufferedReader reader = null; + try { + reader = new BufferedReader(new FileReader(invocationsFile)); + String line = reader.readLine(); + while (line != null) { + String[] parts = line.split(" "); + if (parts.length != 2) { + break; + } + String runId = parts[0]; + String urlString = parts[1]; + Set<SshUrl> urls = runIdToTempDir.get(runId); + if (urls == null) { + urls = new HashSet<SshUrl>(); + runIdToTempDir.put(runId, urls); + } + URI uri = new URI(urlString); + String fullPath = uri.getPath(); + String path = fullPath.substring(0, fullPath.lastIndexOf("/")); + String tempDir = fullPath.substring(fullPath.lastIndexOf("/")); + SshNode node = SshNodeFactory.getInstance().getSshNode( + uri.getHost(), uri.getPort(), path); + SshUrl newUrl = new SshUrl(node); + newUrl.setSubDirectory(tempDir); + urls.add(newUrl); + line = reader.readLine(); + } + } catch (FileNotFoundException e) { + logger.error(e); + } catch (URISyntaxException e) { + logger.error(e); + } catch (IOException e) { + logger.error(e); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + logger.error(e); + } + } + } + } + + public static void persist(File directory) { + File invocationsFile = new File(directory, SSH_INVOCATION_FILE); + BufferedWriter writer = null; + try { + writer = new BufferedWriter(new FileWriter(invocationsFile)); + for (String runId : runIdToTempDir.keySet()) { + for (SshUrl url : runIdToTempDir.get(runId)) { + writer.write(runId); + writer.write(" "); + writer.write(url.toString()); + writer.newLine(); + } + } + } catch (IOException e) { + logger.error(e); + } finally { + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + logger.error(e); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/blob/7d28decd/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshUseCaseInvocation.java ---------------------------------------------------------------------- diff --git a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshUseCaseInvocation.java b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshUseCaseInvocation.java deleted file mode 100755 index 06300a8..0000000 --- a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshUseCaseInvocation.java +++ /dev/null @@ -1,560 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.taverna.activities.externaltool.ssh; - -import java.io.BufferedInputStream; -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.io.InputStream; -import java.io.UnsupportedEncodingException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.Vector; -import java.util.regex.Matcher; - -import org.apache.taverna.activities.externaltool.RetrieveLoginFromTaverna; -import org.apache.taverna.activities.externaltool.desc.ScriptInput; -import org.apache.taverna.activities.externaltool.desc.ScriptOutput; -import org.apache.taverna.activities.externaltool.desc.UseCaseDescription; -import org.apache.taverna.activities.externaltool.invocation.AskUserForPw; -import org.apache.taverna.activities.externaltool.invocation.InvocationException; -import org.apache.taverna.activities.externaltool.invocation.ToolInvocation; -import org.apache.taverna.reference.AbstractExternalReference; -import org.apache.taverna.reference.ErrorDocument; -import org.apache.taverna.reference.ErrorDocumentServiceException; -import org.apache.taverna.reference.ExternalReferenceSPI; -import org.apache.taverna.reference.Identified; -import org.apache.taverna.reference.ReferenceService; -import org.apache.taverna.reference.ReferenceSet; -import org.apache.taverna.reference.ReferencedDataNature; -import org.apache.taverna.reference.T2Reference; -import org.apache.taverna.security.credentialmanager.CredentialManager; - -import org.apache.log4j.Logger; - -import com.jcraft.jsch.ChannelExec; -import com.jcraft.jsch.ChannelSftp; -import com.jcraft.jsch.JSchException; -import com.jcraft.jsch.Session; -import com.jcraft.jsch.SftpException; -import com.jcraft.jsch.ChannelSftp.LsEntry; - -/** - * The job is executed by connecting to a worker pc using ssh, i.e. not via the - * grid. - * - * @author Hajo Krabbenhoeft - */ -public class SshUseCaseInvocation extends ToolInvocation { - - private static Logger logger = Logger.getLogger(SshUseCaseInvocation.class); - - private SshUrl location = null; - - private InputStream stdInputStream = null; - - public static final String SSH_USE_CASE_INVOCATION_TYPE = "D0A4CDEB-DD10-4A8E-A49C-8871003083D8"; - private String tmpname; - private final SshNode workerNode; - private final AskUserForPw askUserForPw; - - private ChannelExec running; - - private List<String> precedingCommands = new ArrayList<String>(); - - private final ByteArrayOutputStream stdout_buf = new ByteArrayOutputStream(); - private final ByteArrayOutputStream stderr_buf = new ByteArrayOutputStream(); - - private static Map<String, Object> nodeLock = Collections - .synchronizedMap(new HashMap<String, Object>()); - - private static Map<String, Set<SshUrl>> runIdToTempDir = Collections - .synchronizedMap(new HashMap<String, Set<SshUrl>>()); - - private static String SSH_INVOCATION_FILE = "sshInvocations"; - - private final CredentialManager credentialManager; - - public static String test(final SshNode workerNode, - final AskUserForPw askUserForPw) { - try { - Session sshSession = SshPool - .getSshSession(workerNode, askUserForPw); - - ChannelSftp sftpTest = (ChannelSftp) sshSession.openChannel("sftp"); - sftpTest.connect(); - sftpTest.cd(workerNode.getDirectory()); - sftpTest.disconnect(); - sshSession.disconnect(); - } catch (JSchException e) { - return e.toString(); - } catch (SftpException e) { - return e.toString(); - } - return null; - } - - public SshUseCaseInvocation(UseCaseDescription desc, SshNode workerNodeA, - AskUserForPw askUserForPwA, CredentialManager credentialManager) - throws JSchException, SftpException { - this.workerNode = workerNodeA; - this.credentialManager = credentialManager; - - setRetrieveData(workerNodeA.isRetrieveData()); - this.askUserForPw = askUserForPwA; - usecase = desc; - - ChannelSftp sftp = SshPool.getSftpPutChannel(workerNode, askUserForPw); - synchronized (getNodeLock(workerNode)) { - - logger.info("Changing remote directory to " - + workerNode.getDirectory()); - sftp.cd(workerNode.getDirectory()); - Random rnd = new Random(); - while (true) { - tmpname = "usecase" + rnd.nextLong(); - try { - sftp.lstat(workerNode.getDirectory() + tmpname); - continue; - } catch (Exception e) { - // file seems to not exist :) - } - sftp.mkdir(workerNode.getDirectory() + tmpname); - sftp.cd(workerNode.getDirectory() + tmpname); - break; - } - } - } - - private static void recursiveDelete(ChannelSftp sftp, String path) - throws SftpException, JSchException { - Vector<?> entries = sftp.ls(path); - for (Object object : entries) { - LsEntry entry = (LsEntry) object; - if (entry.getFilename().equals(".") - || entry.getFilename().equals("..")) { - continue; - } - if (entry.getAttrs().isDir()) { - recursiveDelete(sftp, path + entry.getFilename() + "/"); - } else { - sftp.rm(path + entry.getFilename()); - } - } - sftp.rmdir(path); - } - - private static void deleteDirectory(SshUrl directory, - CredentialManager credentialManager) throws InvocationException { - URI uri; - try { - uri = new URI(directory.toString()); - - ChannelSftp sftp; - SshNode workerNode; - String fullPath = uri.getPath(); - String path = fullPath.substring(0, fullPath.lastIndexOf("/")); - String tempDir = fullPath.substring(fullPath.lastIndexOf("/")); - try { - workerNode = SshNodeFactory.getInstance().getSshNode( - uri.getHost(), uri.getPort(), path); - - sftp = SshPool.getSftpPutChannel(workerNode, - new RetrieveLoginFromTaverna(workerNode.getUrl() - .toString(), credentialManager)); - } catch (JSchException e) { - throw new InvocationException(e); - } - synchronized (getNodeLock(workerNode)) { - try { - sftp.cd(path); - recursiveDelete(sftp, path + "/" + tempDir + "/"); - } catch (SftpException e) { - throw new InvocationException(e); - } catch (JSchException e) { - throw new InvocationException(e); - } - } - } catch (URISyntaxException e1) { - throw new InvocationException(e1); - } - } - - public static void cleanup(String runId, CredentialManager credentialManager) - throws InvocationException { - Set<SshUrl> tempDirectories = runIdToTempDir.get(runId); - if (tempDirectories != null) { - for (SshUrl tempUrl : tempDirectories) { - deleteDirectory(tempUrl, credentialManager); - } - runIdToTempDir.remove(runId); - } - } - - @Override - protected void submit_generate_job_inner() throws InvocationException { - tags.put("uniqueID", "" + getSubmissionID()); - String command = usecase.getCommand(); - for (String cur : tags.keySet()) { - command = command.replaceAll("\\Q%%" + cur + "%%\\E", - Matcher.quoteReplacement(tags.get(cur))); - } - String fullCommand = "cd " + workerNode.getDirectory() + tmpname; - for (String preceding : precedingCommands) { - fullCommand += " && " + preceding; - } - fullCommand += " && " + command; - - logger.info("Full command is " + fullCommand); - - try { - running = SshPool.openExecChannel(workerNode, askUserForPw); - running.setCommand(fullCommand); - running.setOutputStream(stdout_buf); - running.setErrStream(stderr_buf); - if (stdInputStream != null) { - running.setInputStream(stdInputStream); - } - running.connect(); - } catch (JSchException e) { - throw new InvocationException(e); - } - - } - - @Override - public HashMap<String, Object> submit_wait_fetch_results( - ReferenceService referenceService) throws InvocationException { - while (!running.isClosed()) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new InvocationException("Invocation interrupted:" - + e.getMessage()); - } - } - - int exitcode = running.getExitStatus(); - if (!usecase.getValidReturnCodes().contains(exitcode)) { - try { - throw new InvocationException("Invalid exit code " + exitcode - + ":" + stderr_buf.toString("US-ASCII")); - } catch (UnsupportedEncodingException e) { - throw new InvocationException("Invalid exit code " + exitcode - + ":" + stderr_buf.toString()); - } - } - - HashMap<String, Object> results = new HashMap<String, Object>(); - - results.put("STDOUT", stdout_buf.toByteArray()); - results.put("STDERR", stderr_buf.toByteArray()); - try { - stdout_buf.close(); - stderr_buf.close(); - } catch (IOException e2) { - throw new InvocationException(e2); - } - - try { - ChannelSftp sftp = SshPool.getSftpPutChannel(workerNode, - askUserForPw); - synchronized (getNodeLock(workerNode)) { - for (Map.Entry<String, ScriptOutput> cur : usecase.getOutputs() - .entrySet()) { - ScriptOutput scriptOutput = cur.getValue(); - String fullPath = workerNode.getDirectory() + tmpname + "/" - + scriptOutput.getPath(); - try { - if (sftp.stat(fullPath) != null) { - SshUrl url = new SshUrl(workerNode); - url.setSubDirectory(tmpname); - url.setFileName(scriptOutput.getPath()); - if (scriptOutput.isBinary()) { - url.setDataNature(ReferencedDataNature.BINARY); - } else { - url.setDataNature(ReferencedDataNature.TEXT); - url.setCharset("UTF-8"); - } - if (isRetrieveData()) { - SshReference urlRef = new SshReference(url); - InputStream is = urlRef.openStream(null); - AbstractExternalReference ref; - if (scriptOutput.isBinary()) { - ref = inlineByteArrayReferenceBuilder - .createReference(is, null); - } else { - ref = inlineStringReferenceBuilder - .createReference(is, null); - } - try { - is.close(); - } catch (IOException e) { - throw new InvocationException(e); - } - results.put(cur.getKey(), ref); - } else { - results.put(cur.getKey(), url); - } - } else { - ErrorDocument ed = referenceService - .getErrorDocumentService().registerError( - "No result for " + cur.getKey(), 0, - getContext()); - results.put(cur.getKey(), ed); - } - } catch (SftpException e) { - ErrorDocument ed = referenceService - .getErrorDocumentService().registerError( - "No result for " + cur.getKey(), 0, - getContext()); - results.put(cur.getKey(), ed); - - } - } - } - } catch (JSchException e1) { - throw new InvocationException(e1); - } catch (ErrorDocumentServiceException e) { - throw new InvocationException(e); - } - - if (running != null) { - running.disconnect(); - } - if (stdInputStream != null) { - try { - stdInputStream.close(); - } catch (IOException e) { - throw new InvocationException(e); - } - } - - if (isRetrieveData()) { - forgetRun(); - deleteDirectory(location, credentialManager); - - } - return results; - } - - @Override - public String setOneInput(ReferenceService referenceService, - T2Reference t2Reference, ScriptInput input) - throws InvocationException { - String target = null; - String remoteName = null; - if (input.isFile()) { - remoteName = input.getTag(); - } else if (input.isTempFile()) { - remoteName = "tempfile." + (nTempFiles++) + ".tmp"; - - } - if (input.isFile() || input.isTempFile()) { - SshReference sshRef = getAsSshReference(referenceService, - t2Reference, workerNode); - target = workerNode.getDirectory() + tmpname + "/" + remoteName; - logger.info("Target is " + target); - if (sshRef != null) { - if (!input.isForceCopy()) { - String linkCommand = workerNode.getLinkCommand(); - if (linkCommand != null) { - String actualLinkCommand = getActualOsCommand( - linkCommand, sshRef.getFullPath(), remoteName, - target); - precedingCommands.add(actualLinkCommand); - return target; - - } - } - String copyCommand = workerNode.getCopyCommand(); - if (copyCommand != null) { - String actualCopyCommand = getActualOsCommand(copyCommand, - sshRef.getFullPath(), remoteName, target); - precedingCommands.add(actualCopyCommand); - return target; - } - } - try { - ChannelSftp sftp = SshPool.getSftpPutChannel(workerNode, - askUserForPw); - synchronized (getNodeLock(workerNode)) { - InputStream r = getAsStream(referenceService, t2Reference); - sftp.put(r, target); - r.close(); - } - } catch (SftpException e) { - throw new InvocationException(e); - } catch (JSchException e) { - throw new InvocationException(e); - } catch (IOException e) { - throw new InvocationException(e); - } - return target; - } else { - String value = (String) referenceService.renderIdentifier( - t2Reference, String.class, this.getContext()); - return value; - - } - } - - public SshReference getAsSshReference(ReferenceService referenceService, - T2Reference t2Reference, SshNode workerNode) { - Identified identified = referenceService.resolveIdentifier(t2Reference, - null, null); - if (identified instanceof ReferenceSet) { - for (ExternalReferenceSPI ref : ((ReferenceSet) identified) - .getExternalReferences()) { - if (ref instanceof SshReference) { - SshReference sshRef = (SshReference) ref; - if (sshRef.getHost().equals(workerNode.getHost())) { - return sshRef; - } - } - } - } - return null; - } - - private static Object getNodeLock(final SshNode node) { - return getNodeLock(node.getHost()); - } - - private static synchronized Object getNodeLock(String hostName) { - if (!nodeLock.containsKey(hostName)) { - nodeLock.put(hostName, new Object()); - } - return nodeLock.get(hostName); - } - - @Override - public void setStdIn(ReferenceService referenceService, - T2Reference t2Reference) { - stdInputStream = new BufferedInputStream(getAsStream(referenceService, - t2Reference)); - } - - @Override - public void rememberRun(String runId) { - this.setRunId(runId); - Set<SshUrl> directories = runIdToTempDir.get(runId); - if (directories == null) { - directories = Collections.synchronizedSet(new HashSet<SshUrl>()); - runIdToTempDir.put(runId, directories); - } - location = new SshUrl(workerNode); - location.setSubDirectory(tmpname); - directories.add(location); - } - - private void forgetRun() { - Set<SshUrl> directories = runIdToTempDir.get(getRunId()); - directories.remove(location); - } - - public static void load(File directory) { - File invocationsFile = new File(directory, SSH_INVOCATION_FILE); - if (!invocationsFile.exists()) { - return; - } - BufferedReader reader = null; - try { - reader = new BufferedReader(new FileReader(invocationsFile)); - String line = reader.readLine(); - while (line != null) { - String[] parts = line.split(" "); - if (parts.length != 2) { - break; - } - String runId = parts[0]; - String urlString = parts[1]; - Set<SshUrl> urls = runIdToTempDir.get(runId); - if (urls == null) { - urls = new HashSet<SshUrl>(); - runIdToTempDir.put(runId, urls); - } - URI uri = new URI(urlString); - String fullPath = uri.getPath(); - String path = fullPath.substring(0, fullPath.lastIndexOf("/")); - String tempDir = fullPath.substring(fullPath.lastIndexOf("/")); - SshNode node = SshNodeFactory.getInstance().getSshNode( - uri.getHost(), uri.getPort(), path); - SshUrl newUrl = new SshUrl(node); - newUrl.setSubDirectory(tempDir); - urls.add(newUrl); - line = reader.readLine(); - } - } catch (FileNotFoundException e) { - logger.error(e); - } catch (URISyntaxException e) { - logger.error(e); - } catch (IOException e) { - logger.error(e); - } finally { - if (reader != null) { - try { - reader.close(); - } catch (IOException e) { - logger.error(e); - } - } - } - } - - public static void persist(File directory) { - File invocationsFile = new File(directory, SSH_INVOCATION_FILE); - BufferedWriter writer = null; - try { - writer = new BufferedWriter(new FileWriter(invocationsFile)); - for (String runId : runIdToTempDir.keySet()) { - for (SshUrl url : runIdToTempDir.get(runId)) { - writer.write(runId); - writer.write(" "); - writer.write(url.toString()); - writer.newLine(); - } - } - } catch (IOException e) { - logger.error(e); - } finally { - if (writer != null) { - try { - writer.close(); - } catch (IOException e) { - logger.error(e); - } - } - } - } -}
