Repository: airavata Updated Branches: refs/heads/feature-workload-mgmt 9f0e45b25 -> d231956e8
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UNICORESecurityContext.java ---------------------------------------------------------------------- diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UNICORESecurityContext.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UNICORESecurityContext.java new file mode 100644 index 0000000..adacb21 --- /dev/null +++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UNICORESecurityContext.java @@ -0,0 +1,195 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +package org.apache.airavata.worker.task.jobsubmission.utils.bes; + +import de.fzj.unicore.uas.security.ProxyCertOutHandler; +import eu.emi.security.authn.x509.X509Credential; +import eu.emi.security.authn.x509.impl.KeyAndCertCredential; +import eu.emi.security.authn.x509.impl.X500NameUtils; +import eu.unicore.util.httpclient.DefaultClientConfiguration; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.credential.store.store.CredentialReader; +import org.apache.airavata.model.experiment.UserConfigurationDataModel; +import org.apache.airavata.worker.core.RequestData; +import org.apache.airavata.worker.core.exceptions.WorkerException; +import org.bouncycastle.asn1.x500.style.BCStyle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +public class UNICORESecurityContext extends X509SecurityContext { + + private static final long serialVersionUID = 1L; + protected static final Logger log = LoggerFactory.getLogger(UNICORESecurityContext.class); + private DefaultClientConfiguration secProperties; + + + public UNICORESecurityContext(CredentialReader credentialReader, RequestData requestData) { + super(credentialReader, requestData); + } + + + + public DefaultClientConfiguration getDefaultConfiguration(Boolean enableMessageLogging) throws WorkerException, ApplicationSettingsException { + try{ + X509Credential cred = getX509Credentials(); + secProperties = new DefaultClientConfiguration(dcValidator, cred); + setExtraSettings(); + } + catch (Exception e) { + throw new WorkerException(e.getMessage(), e); + } + if(enableMessageLogging) secProperties.setMessageLogging(true); + + return secProperties; + } + + public DefaultClientConfiguration getDefaultConfiguration(Boolean enableMessageLogging, + UserConfigurationDataModel userDataModel) + throws WorkerException, ApplicationSettingsException { + + X509Credential cred = null; + try{ + boolean genCert = userDataModel.isGenerateCert(); + if(genCert) { + String userDN = userDataModel.getUserDN(); + if (userDN == null || "".equals(userDN)){ + log.warn("Cannot generate cert, falling back to GFAC configured MyProxy credentials"); + return getDefaultConfiguration(enableMessageLogging); + } + else { + log.info("Generating X.509 certificate for: "+userDN); + try { + String caCertPath = ServerSettings.getSetting(BESConstants.PROP_CA_CERT_PATH, ""); + String caKeyPath = ServerSettings.getSetting(BESConstants.PROP_CA_KEY_PATH, ""); + String caKeyPass = ServerSettings.getSetting(BESConstants.PROP_CA_KEY_PASS, ""); + + if(caCertPath.equals("") || caKeyPath.equals("")) { + throw new Exception("CA certificate or key file path missing in the properties file. " + + "Please make sure " + BESConstants.PROP_CA_CERT_PATH + " or " + + BESConstants.PROP_CA_KEY_PATH + " are not empty."); + } + + if("".equals(caKeyPass)) { + log.warn("Caution: CA key has no password. For security reasons it is highly recommended to set a CA key password"); + } + cred = generateShortLivedCredential(userDN, caCertPath, caKeyPath, caKeyPass); + }catch (Exception e){ + throw new WorkerException("Error occured while generating a short lived credential for user:"+userDN, e); + } + + } + }else { + return getDefaultConfiguration(enableMessageLogging); + } + + secProperties = new DefaultClientConfiguration(dcValidator, cred); + setExtraSettings(); + } + catch (Exception e) { + throw new WorkerException(e.getMessage(), e); + } + secProperties.getETDSettings().setExtendTrustDelegation(true); + if(enableMessageLogging) secProperties.setMessageLogging(true); +// secProperties.setDoSignMessage(true); + secProperties.getETDSettings() + .setIssuerCertificateChain(secProperties.getCredential().getCertificateChain()); + + return secProperties; + } + + + /** + * Get server signed credentials. Each time it is invoked new certificate + * is returned. + * + * @param userID + * @param userDN + * @param caCertPath + * @param caKeyPath + * @param caKeyPwd + * @return + * @throws WorkerException + */ + public DefaultClientConfiguration getServerSignedConfiguration(String userID, String userDN, String caCertPath, String caKeyPath, String caKeyPwd) throws WorkerException { + try { + KeyAndCertCredential cred = SecurityUtils.generateShortLivedCertificate(userDN,caCertPath,caKeyPath,caKeyPwd); + secProperties = new DefaultClientConfiguration(dcValidator, cred); + setExtraSettings(); + } catch (Exception e) { + throw new WorkerException(e.getMessage(), e); + } + + return secProperties; + } + + + + + + private void setExtraSettings(){ + secProperties.getETDSettings().setExtendTrustDelegation(true); + + secProperties.setDoSignMessage(true); + + String[] outHandlers = secProperties.getOutHandlerClassNames(); + + Set<String> outHandlerLst = null; + + // timeout in milliseconds + Properties p = secProperties.getExtraSettings(); + + if(p == null) { + p = new Properties(); + } + + p.setProperty("http.connection.timeout", "5000"); + p.setProperty("http.socket.timeout", "5000"); + + if (outHandlers == null) { + outHandlerLst = new HashSet<String>(); + } else { + outHandlerLst = new HashSet<String>(Arrays.asList(outHandlers)); + } + + outHandlerLst.add(ProxyCertOutHandler.class.getName()); + + secProperties.setOutHandlerClassNames(outHandlerLst + .toArray(new String[outHandlerLst.size()])); + + secProperties.getETDSettings().setExtendTrustDelegation(true); + + } + + + private String getCNFromUserDN(String userDN) { + return X500NameUtils.getAttributeValues(userDN, BCStyle.CN)[0]; + + } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/URIUtils.java ---------------------------------------------------------------------- diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/URIUtils.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/URIUtils.java new file mode 100644 index 0000000..1f83370 --- /dev/null +++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/URIUtils.java @@ -0,0 +1,121 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.worker.task.jobsubmission.utils.bes; + + +import org.apache.commons.httpclient.URI; +import org.apache.commons.httpclient.URIException; +import org.apache.commons.httpclient.util.URIUtil; + +import java.net.URISyntaxException; + +public class URIUtils { + + public static String encodeAll(String uri) throws URIException + { + String result = encodeAuthority(uri); + result = encodePath(uri); + result = encodeQuery(result ); + result = encodeFragment(result ); + return result; + } + + public static String encodeAuthority(String uri) throws URIException + { + int start = uri.indexOf("//"); + if(start == -1) return uri; + start++; + int end = uri.indexOf("/",start+1); + if(end == -1) end = uri.indexOf("?",start+1); + if(end == -1) end = uri.indexOf("#",start+1); + if(end == -1) end = uri.length(); + String before = uri.substring(0, start+1); + String authority= uri.substring(start+1,end); + String after = uri.substring(end); + authority = URIUtil.encode(authority, URI.allowed_authority); + + return before+authority+after; + } + + public static String encodePath(String uri) throws URIException + { + int doubleSlashIndex = uri.indexOf("//"); + boolean hasAuthority = doubleSlashIndex >= 0; + int start = -1; + if(hasAuthority) + { + start = uri.indexOf("/",doubleSlashIndex+2); + } + else + { + start = uri.indexOf(":"); + } + if(start == -1) return uri; + + int end = uri.indexOf("?",start+1); + if(end == -1) end = uri.indexOf("#",start+1); + if(end == -1) end = uri.length(); + String before = uri.substring(0, start+1); + String path= uri.substring(start+1,end); + String after = uri.substring(end); + path = URIUtil.encode(path, URI.allowed_abs_path); + return before+path+after; + } + + + public static String encodeQuery(String uri) throws URIException + { + int queryStart = uri.indexOf("?"); + if(queryStart == -1) return uri; + int queryEnd = uri.indexOf("#"); + if(queryEnd == -1) queryEnd = uri.length(); + + String beforeQuery = uri.substring(0, queryStart+1); + String query = uri.substring(queryStart+1,queryEnd); + String afterQuery = uri.substring(queryEnd); + query = URIUtil.encode(query, URI.allowed_query); + return beforeQuery+query+afterQuery; + } + + + public static String encodeFragment(String uri) throws URIException + { + int fragmentStart = uri.indexOf("#"); + if(fragmentStart == -1) return uri; + + String beforeFragment = uri.substring(0, fragmentStart+1); + String fragment = uri.substring(fragmentStart+1); + fragment = URIUtil.encode(fragment, URI.allowed_fragment); + return beforeFragment+fragment; + } + + public static java.net.URI createGsiftpURI(String host, String localPath) throws URISyntaxException { + StringBuffer buf = new StringBuffer(); + if (!host.startsWith("gsiftp://")) + buf.append("gsiftp://"); + buf.append(host); + if (!host.endsWith("/")) + buf.append("/"); + buf.append(localPath); + return new java.net.URI(buf.toString()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/X509SecurityContext.java ---------------------------------------------------------------------- diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/X509SecurityContext.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/X509SecurityContext.java new file mode 100644 index 0000000..5be5bc1 --- /dev/null +++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/X509SecurityContext.java @@ -0,0 +1,340 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.worker.task.jobsubmission.utils.bes; + + +import eu.emi.security.authn.x509.X509Credential; +import eu.emi.security.authn.x509.helpers.CertificateHelpers; +import eu.emi.security.authn.x509.helpers.proxy.X509v3CertificateBuilder; +import eu.emi.security.authn.x509.impl.CertificateUtils; +import eu.emi.security.authn.x509.impl.CertificateUtils.Encoding; +import eu.emi.security.authn.x509.impl.DirectoryCertChainValidator; +import eu.emi.security.authn.x509.impl.KeyAndCertCredential; +import eu.emi.security.authn.x509.impl.X500NameUtils; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.Constants; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.credential.store.credential.Credential; +import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential; +import org.apache.airavata.credential.store.store.CredentialReader; +import org.apache.airavata.worker.core.RequestData; +import org.apache.airavata.worker.core.context.AbstractSecurityContext; +import org.apache.airavata.worker.core.exceptions.WorkerException; +import org.apache.cxf.interceptor.security.AbstractSecurityContextInInterceptor; +import org.bouncycastle.asn1.ASN1InputStream; +import org.bouncycastle.asn1.x500.X500Name; +import org.bouncycastle.asn1.x500.style.BCStyle; +import org.bouncycastle.asn1.x509.AlgorithmIdentifier; +import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.x500.X500Principal; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigInteger; +import java.security.InvalidKeyException; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.PrivateKey; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Random; + +/** + * Handles X509 Certificate based security. + */ +public class X509SecurityContext extends AbstractSecurityContext { + + private static final long serialVersionUID = 1L; + + protected static final Logger log = LoggerFactory.getLogger(X509SecurityContext.class); + + /* + * context name + */ + public static final String X509_SECURITY_CONTEXT = "x509.security.context"; + + public static final int CREDENTIAL_RENEWING_THRESH_HOLD = 10 * 90; + + protected static DirectoryCertChainValidator dcValidator; + + private X509Credential x509Credentials= null; + + static { + try { + setUpTrustedCertificatePath(); + // set up directory based trust validator + dcValidator = getTrustedCerts(); + } catch (Exception e) { + log.error(e.getLocalizedMessage(), e); + } + } + + + public static void setUpTrustedCertificatePath(String trustedCertificatePath) { + + File file = new File(trustedCertificatePath); + + if (!file.exists() || !file.canRead()) { + File f = new File("."); + log.info("Current directory " + f.getAbsolutePath()); + throw new RuntimeException("Cannot read trusted certificate path " + trustedCertificatePath); + } else { + System.setProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY, file.getAbsolutePath()); + } + } + + private static void setUpTrustedCertificatePath() throws ApplicationSettingsException { + + String trustedCertificatePath = ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION); + + setUpTrustedCertificatePath(trustedCertificatePath); + } + + /** + * Gets the trusted certificate path. Trusted certificate path is stored in "X509_CERT_DIR" + * system property. + * @return The trusted certificate path as a string. + */ + public static String getTrustedCertificatePath() { + return System.getProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY); + } + + + public X509SecurityContext(CredentialReader credentialReader, RequestData requestData) { + super(credentialReader, requestData); + } + + + /** + * Gets X509Credentials. The process is as follows; + * If credentials were queried for the first time create credentials. + * 1. Try creating credentials using certificates stored in the credential store + * 2. If 1 fails use user name and password to create credentials + * @return x509credentials (from CANL security API) + * @throws ApplicationSettingsException + */ + public X509Credential getX509Credentials() throws WorkerException, ApplicationSettingsException { + + if(getCredentialReader() == null) { + return getDefaultCredentials(); + } + + if (x509Credentials == null) { + + try { + x509Credentials = getCredentialsFromStore(); + } catch (Exception e) { + log.error("An exception occurred while retrieving credentials from the credential store. " + + "Will continue with my proxy user name and password.", e); + } + + // If store does not have credentials try to get from user name and password + if (x509Credentials == null) { + x509Credentials = getDefaultCredentials(); + } + + // if still null, throw an exception + if (x509Credentials == null) { + throw new WorkerException("Unable to retrieve my proxy credentials to continue operation."); + } + } else { + try { + + final long remainingTime = x509Credentials.getCertificate().getNotAfter().getTime() - new Date().getTime(); + + if (remainingTime < CREDENTIAL_RENEWING_THRESH_HOLD) { + // return renewCredentials(); + log.warn("Do not support credentials renewal"); + } + + log.info("Fall back to get new default credentials"); + + try { + x509Credentials.getCertificate().checkValidity(); + }catch(Exception e){ + x509Credentials = getDefaultCredentials(); + } + + } catch (Exception e) { + throw new WorkerException("Unable to retrieve remaining life time from credentials.", e); + } + } + + return x509Credentials; + } + + /** + * Reads the credentials from credential store. + * @return If token is found in the credential store, will return a valid credential. Else returns null. + * @throws Exception If an error occurred while retrieving credentials. + */ + public X509Credential getCredentialsFromStore() throws Exception { + + if (getCredentialReader() == null) { + return null; + } + + Credential credential = getCredentialReader().getCredential(getRequestData().getGatewayId(), + getRequestData().getTokenId()); + + if (credential != null) { + if (credential instanceof CertificateCredential) { + + log.info("Successfully found credentials for token id - " + getRequestData().getTokenId() + + " gateway id - " + getRequestData().getGatewayId()); + + CertificateCredential certificateCredential = (CertificateCredential) credential; + + X509Certificate[] certificates = certificateCredential.getCertificates(); + + KeyAndCertCredential keyAndCert = new KeyAndCertCredential(certificateCredential.getPrivateKey(), certificates); + + return keyAndCert; + //return new GlobusGSSCredentialImpl(newCredential, + // GSSCredential.INITIATE_AND_ACCEPT); + } else { + log.info("Credential type is not CertificateCredential. Cannot create mapping globus credentials. " + + "Credential type - " + credential.getClass().getName()); + } + } else { + log.info("Could not find credentials for token - " + getRequestData().getTokenId() + " and " + + "gateway id - " + getRequestData().getGatewayId()); + } + + return null; + } + + /** + * Gets the default proxy certificate. + * @return Default my proxy credentials. + * @throws ApplicationSettingsException + */ + public X509Credential getDefaultCredentials() throws WorkerException, ApplicationSettingsException { + MyProxyLogon logon = new MyProxyLogon(); + logon.setValidator(dcValidator); + logon.setHost(getRequestData().getMyProxyServerUrl()); + logon.setPort(getRequestData().getMyProxyPort()); + logon.setUsername(getRequestData().getMyProxyUserName()); + logon.setPassphrase(getRequestData().getMyProxyPassword().toCharArray()); + logon.setLifetime(getRequestData().getMyProxyLifeTime()); + + try { + logon.connect(); + logon.logon(); + logon.getCredentials(); + logon.disconnect(); + PrivateKey pk=logon.getPrivateKey(); + return new KeyAndCertCredential(pk, new X509Certificate[]{logon.getCertificate()}); + } catch (Exception e) { + throw new WorkerException("An error occurred while retrieving default security credentials.", e); + } + } + + private static DirectoryCertChainValidator getTrustedCerts() throws Exception{ + String certLocation = getTrustedCertificatePath(); + List<String> trustedCert = new ArrayList<String>(); + trustedCert.add(certLocation + "/*.0"); + trustedCert.add(certLocation + "/*.pem"); + DirectoryCertChainValidator dcValidator = new DirectoryCertChainValidator(trustedCert, Encoding.PEM, -1, 60000, null); + return dcValidator; + } + + private String getCNFromUserDN(String userDN) { + return X500NameUtils.getAttributeValues(userDN, BCStyle.CN)[0]; + } + + public KeyAndCertCredential generateShortLivedCredential(String userDN, String caCertPath, String caKeyPath, + String caPwd) throws Exception { + final long CredentialGoodFromOffset = 1000L * 60L * 15L; // 15 minutes + // ago + + final long startTime = System.currentTimeMillis() - CredentialGoodFromOffset; + final long endTime = startTime + 30 * 3600 * 1000; + + String keyLengthProp = "1024"; + int keyLength = Integer.parseInt(keyLengthProp); + String signatureAlgorithm = "SHA1withRSA"; + + KeyAndCertCredential caCred = getCACredential(caCertPath, caKeyPath, caPwd); + + KeyPairGenerator kpg = KeyPairGenerator.getInstance(caCred.getKey().getAlgorithm()); + kpg.initialize(keyLength); + KeyPair pair = kpg.generateKeyPair(); + + X500Principal subjectDN = new X500Principal(userDN); + Random rand = new Random(); + + SubjectPublicKeyInfo publicKeyInfo; + try { + publicKeyInfo = SubjectPublicKeyInfo.getInstance(new ASN1InputStream(pair.getPublic().getEncoded()) + .readObject()); + } catch (IOException e) { + throw new InvalidKeyException("Can not parse the public key" + + "being included in the short lived certificate", e); + } + + X500Name issuerX500Name = CertificateHelpers.toX500Name(caCred.getCertificate().getSubjectX500Principal()); + + X500Name subjectX500Name = CertificateHelpers.toX500Name(subjectDN); + + X509v3CertificateBuilder certBuilder = new X509v3CertificateBuilder(issuerX500Name, new BigInteger(20, rand), + new Date(startTime), new Date(endTime), subjectX500Name, publicKeyInfo); + + AlgorithmIdentifier sigAlgId = X509v3CertificateBuilder.extractAlgorithmId(caCred.getCertificate()); + + X509Certificate certificate = certBuilder.build(caCred.getKey(), sigAlgId, signatureAlgorithm, null, null); + + certificate.checkValidity(new Date()); + certificate.verify(caCred.getCertificate().getPublicKey()); + KeyAndCertCredential result = new KeyAndCertCredential(pair.getPrivate(), new X509Certificate[] { certificate, + caCred.getCertificate() }); + + return result; + } + + private KeyAndCertCredential getCACredential(String caCertPath, String caKeyPath, String password) throws Exception { + InputStream isKey, isCert; + isKey = isCert = null; + try { + isKey = new FileInputStream(caKeyPath); + PrivateKey pk = CertificateUtils.loadPrivateKey(isKey, Encoding.PEM, password.toCharArray()); + + isCert = new FileInputStream(caCertPath); + X509Certificate caCert = CertificateUtils.loadCertificate(isCert, Encoding.PEM); + return new KeyAndCertCredential(pk, new X509Certificate[] { caCert }); + } finally { + if (isKey != null){ + isKey.close(); + } + if (isCert != null) { + isCert.close(); + } + } + } + + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/pom.xml ---------------------------------------------------------------------- diff --git a/modules/worker/worker-core/pom.xml b/modules/worker/worker-core/pom.xml index 8f00821..32d37d1 100644 --- a/modules/worker/worker-core/pom.xml +++ b/modules/worker/worker-core/pom.xml @@ -63,6 +63,12 @@ <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> + <!-- zookeeper dependencies --> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + <version>${curator.version}</version> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/RequestData.java ---------------------------------------------------------------------- diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/RequestData.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/RequestData.java new file mode 100644 index 0000000..8a961c5 --- /dev/null +++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/RequestData.java @@ -0,0 +1,149 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.airavata.worker.core; + +/** + * User: AmilaJ ([email protected]) + * Date: 6/28/13 + * Time: 3:28 PM + */ + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.worker.core.utils.WorkerConstants; + +/** + * Encapsulates GFac specific data that are coming in the request. + */ +public class RequestData { + + private static final int DEFAULT_LIFE_TIME = 3600; + private static final int DEFAULT_MY_PROXY_PORT = 7512; + + private String tokenId; + private String requestUser; + private String gatewayId; + + private String myProxyServerUrl = null; + private int myProxyPort = 0; + private String myProxyUserName = null; + private String myProxyPassword = null; + private int myProxyLifeTime = DEFAULT_LIFE_TIME; + + + + + public RequestData() { + } + + public RequestData(String gatewayId) { + this.gatewayId = gatewayId; + } + + public RequestData(String tokenId, String requestUser, String gatewayId) { + this.tokenId = tokenId; + this.requestUser = requestUser; + this.gatewayId = gatewayId; + } + + public String getTokenId() { + return tokenId; + } + + public void setTokenId(String tokenId) { + this.tokenId = tokenId; + } + + public String getRequestUser() { + return requestUser; + } + + public void setRequestUser(String requestUser) { + this.requestUser = requestUser; + } + + public String getGatewayId() { + return gatewayId; + } + + public void setGatewayId(String gatewayId) { + this.gatewayId = gatewayId; + } + + public String getMyProxyServerUrl() throws ApplicationSettingsException { + if (myProxyServerUrl == null) { + myProxyServerUrl = ServerSettings.getSetting(WorkerConstants.MYPROXY_SERVER); + } + return myProxyServerUrl; + } + + public void setMyProxyServerUrl(String myProxyServerUrl) { + this.myProxyServerUrl = myProxyServerUrl; + } + + public int getMyProxyPort() { + + if (myProxyPort == 0) { + String sPort = ServerSettings.getSetting(WorkerConstants.MYPROXY_SERVER_PORT, Integer.toString(DEFAULT_MY_PROXY_PORT)); + myProxyPort = Integer.parseInt(sPort); + } + + return myProxyPort; + } + + public void setMyProxyPort(int myProxyPort) { + this.myProxyPort = myProxyPort; + } + + public String getMyProxyUserName() throws ApplicationSettingsException { + if (myProxyUserName == null) { + myProxyUserName = ServerSettings.getSetting(WorkerConstants.MYPROXY_USER); + } + + return myProxyUserName; + } + + public void setMyProxyUserName(String myProxyUserName) { + this.myProxyUserName = myProxyUserName; + } + + public String getMyProxyPassword() throws ApplicationSettingsException { + + if (myProxyPassword == null) { + myProxyPassword = ServerSettings.getSetting(WorkerConstants.MYPROXY_PASS); + } + + return myProxyPassword; + } + + public int getMyProxyLifeTime() { + String life = ServerSettings.getSetting(WorkerConstants.MYPROXY_LIFE,Integer.toString(myProxyLifeTime)); + myProxyLifeTime = Integer.parseInt(life); + return myProxyLifeTime; + } + + public void setMyProxyLifeTime(int myProxyLifeTime) { + this.myProxyLifeTime = myProxyLifeTime; + } + + public void setMyProxyPassword(String myProxyPassword) { + this.myProxyPassword = myProxyPassword; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/SecurityContext.java ---------------------------------------------------------------------- diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/SecurityContext.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/SecurityContext.java new file mode 100644 index 0000000..aec1d83 --- /dev/null +++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/SecurityContext.java @@ -0,0 +1,24 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.airavata.worker.core; + +public interface SecurityContext { + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/context/AbstractSecurityContext.java ---------------------------------------------------------------------- diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/context/AbstractSecurityContext.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/context/AbstractSecurityContext.java new file mode 100644 index 0000000..c4798e2 --- /dev/null +++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/context/AbstractSecurityContext.java @@ -0,0 +1,57 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.airavata.worker.core.context; + +/** + * User: AmilaJ ([email protected]) + * Date: 6/26/13 + * Time: 4:33 PM + */ + +import org.apache.airavata.credential.store.store.CredentialReader; +import org.apache.airavata.worker.core.RequestData; +import org.apache.airavata.worker.core.SecurityContext; + +import java.io.Serializable; + +/** + * Abstract implementation of the security context. + */ +public abstract class AbstractSecurityContext implements SecurityContext, Serializable { + + private CredentialReader credentialReader; + private RequestData requestData; + + public AbstractSecurityContext(CredentialReader credentialReader, RequestData requestData) { + this.credentialReader = credentialReader; + this.requestData = requestData; + } + public AbstractSecurityContext() { + + } + + public CredentialReader getCredentialReader() { + return credentialReader; + } + + public RequestData getRequestData() { + return requestData; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/SSHUtils.java ---------------------------------------------------------------------- diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/SSHUtils.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/SSHUtils.java new file mode 100644 index 0000000..705b379 --- /dev/null +++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/SSHUtils.java @@ -0,0 +1,524 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.worker.core.utils; + +import com.jcraft.jsch.Channel; +import com.jcraft.jsch.ChannelExec; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; +import org.apache.airavata.worker.core.exceptions.SSHApiException; +import org.apache.airavata.worker.core.exceptions.WorkerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.Arrays; +import java.util.List; + +/** + * Utility class to do all ssh and scp related things. + */ +public class SSHUtils { + private static final Logger log = LoggerFactory.getLogger(SSHUtils.class); + + + /** + * This will copy a local file to a remote location + * + * @param remoteFile remote location you want to transfer the file, this cannot be a directory, if user pass + * a dirctory we do copy it to that directory but we simply return the directory name + * todo handle the directory name as input and return the proper final output file name + * @param localFile Local file to transfer, this can be a directory + * @return returns the final remote file path, so that users can use the new file location + */ + public static String scpTo(String localFile, String remoteFile, Session session) throws IOException, + JSchException, SSHApiException { + FileInputStream fis = null; + String prefix = null; + if (new File(localFile).isDirectory()) { + prefix = localFile + File.separator; + } + boolean ptimestamp = true; + + // exec 'scp -t rfile' remotely + String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + remoteFile; + Channel channel = session.openChannel("exec"); + + StandardOutReader stdOutReader = new StandardOutReader(); + ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); + ((ChannelExec) channel).setCommand(command); + + // get I/O streams for remote scp + OutputStream out = channel.getOutputStream(); + InputStream in = channel.getInputStream(); + + channel.connect(); + + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + + File _lfile = new File(localFile); + + if (ptimestamp) { + command = "T" + (_lfile.lastModified() / 1000) + " 0"; + // The access time should be sent here, + // but it is not accessible with JavaAPI ;-< + command += (" " + (_lfile.lastModified() / 1000) + " 0\n"); + out.write(command.getBytes()); + out.flush(); + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + } + + // send "C0644 filesize filename", where filename should not include '/' + long filesize = _lfile.length(); + command = "C0644 " + filesize + " "; + if (localFile.lastIndexOf('/') > 0) { + command += localFile.substring(localFile.lastIndexOf('/') + 1); + } else { + command += localFile; + } + command += "\n"; + out.write(command.getBytes()); + out.flush(); + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + + // send a content of localFile + fis = new FileInputStream(localFile); + byte[] buf = new byte[1024]; + while (true) { + int len = fis.read(buf, 0, buf.length); + if (len <= 0) break; + out.write(buf, 0, len); //out.flush(); + } + fis.close(); + fis = null; + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + out.close(); + stdOutReader.onOutput(channel); + + + channel.disconnect(); + if (stdOutReader.getStdErrorString().contains("scp:")) { + throw new SSHApiException(stdOutReader.getStdErrorString()); + } + //since remote file is always a file we just return the file + return remoteFile; + } + + /** + * This method will copy a remote file to a local directory + * + * @param remoteFile remote file path, this has to be a full qualified path + * @param localFile This is the local file to copy, this can be a directory too + * @return returns the final local file path of the new file came from the remote resource + */ + public static void scpFrom(String remoteFile, String localFile, Session session) throws IOException, + JSchException, SSHApiException { + FileOutputStream fos = null; + try { + String prefix = null; + if (new File(localFile).isDirectory()) { + prefix = localFile + File.separator; + } + + // exec 'scp -f remotefile' remotely + String command = "scp -f " + remoteFile; + Channel channel = session.openChannel("exec"); + ((ChannelExec) channel).setCommand(command); + + StandardOutReader stdOutReader = new StandardOutReader(); + ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); + // get I/O streams for remote scp + OutputStream out = channel.getOutputStream(); + InputStream in = channel.getInputStream(); + + if (!channel.isClosed()){ + channel.connect(); + } + + byte[] buf = new byte[1024]; + + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + + while (true) { + int c = checkAck(in); + if (c != 'C') { + break; + } + + // read '0644 ' + in.read(buf, 0, 5); + + long filesize = 0L; + while (true) { + if (in.read(buf, 0, 1) < 0) { + // error + break; + } + if (buf[0] == ' ') break; + filesize = filesize * 10L + (long) (buf[0] - '0'); + } + + String file = null; + for (int i = 0; ; i++) { + in.read(buf, i, 1); + if (buf[i] == (byte) 0x0a) { + file = new String(buf, 0, i); + break; + } + } + + //System.out.println("filesize="+filesize+", file="+file); + + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + + // read a content of lfile + fos = new FileOutputStream(prefix == null ? localFile : prefix + file); + int foo; + while (true) { + if (buf.length < filesize) foo = buf.length; + else foo = (int) filesize; + foo = in.read(buf, 0, foo); + if (foo < 0) { + // error + break; + } + fos.write(buf, 0, foo); + filesize -= foo; + if (filesize == 0L) break; + } + fos.close(); + fos = null; + + if (checkAck(in) != 0) { + String error = "Error transfering the file content"; + log.error(error); + throw new SSHApiException(error); + } + + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + } + stdOutReader.onOutput(channel); + if (stdOutReader.getStdErrorString().contains("scp:")) { + throw new SSHApiException(stdOutReader.getStdErrorString()); + } + + } catch (Exception e) { + log.error(e.getMessage(), e); + } finally { + try { + if (fos != null) fos.close(); + } catch (Exception ee) { + } + } + } + + /** + * This method will copy a remote file to a local directory + * + * @param sourceFile remote file path, this has to be a full qualified path + * @param sourceSession JSch session for source + * @param destinationFile This is the local file to copy, this can be a directory too + * @param destinationSession JSch Session for target + * @return returns the final local file path of the new file came from the remote resource + */ + public static void scpThirdParty(String sourceFile, Session sourceSession, String destinationFile, Session destinationSession, boolean ignoreEmptyFile) throws + IOException, JSchException { + OutputStream sout = null; + InputStream sin = null; + OutputStream dout = null; + InputStream din = null; + try { + String prefix = null; + + // exec 'scp -f sourceFile' + String sourceCommand = "scp -f " + sourceFile; + Channel sourceChannel = sourceSession.openChannel("exec"); + ((ChannelExec) sourceChannel).setCommand(sourceCommand); + StandardOutReader sourceStdOutReader = new StandardOutReader(); + ((ChannelExec) sourceChannel).setErrStream(sourceStdOutReader.getStandardError()); + // get I/O streams for remote scp + sout = sourceChannel.getOutputStream(); + sin = sourceChannel.getInputStream(); + sourceChannel.connect(); + + boolean ptimestamp = true; + // exec 'scp -t destinationFile' + String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + destinationFile; + Channel targetChannel = destinationSession.openChannel("exec"); + StandardOutReader targetStdOutReader = new StandardOutReader(); + ((ChannelExec) targetChannel).setErrStream(targetStdOutReader.getStandardError()); + ((ChannelExec) targetChannel).setCommand(command); + // get I/O streams for remote scp + dout = targetChannel.getOutputStream(); + din = targetChannel.getInputStream(); + targetChannel.connect(); + + if (checkAck(din) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new Exception(error); + } + + + byte[] buf = new byte[1024]; + + // send '\0' + buf[0] = 0; + sout.write(buf, 0, 1); + sout.flush(); + + log.info("Initiating transfer from:" + sourceFile + " To: " + destinationFile + ", Ignore Empty file : " + ignoreEmptyFile); + + while (true) { + int c = checkAck(sin); + if (c != 'C') { + break; + } + + // read '0644 ' + sin.read(buf, 0, 5); + + long fileSize = 0L; + while (true) { + if (sin.read(buf, 0, 1) < 0) { + // error + break; + } + if (buf[0] == ' ') break; + fileSize = fileSize * 10L + (long) (buf[0] - '0'); + } + + String fileName = null; + for (int i = 0; ; i++) { + sin.read(buf, i, 1); + if (buf[i] == (byte) 0x0a) { + fileName = new String(buf, 0, i); + break; + } + } + + //FIXME: Remove me after fixing file transfer issue + if(fileSize == 0L){ + log.warn("*****Zero byte file*****. Transferring from:" + sourceFile + " To: " + destinationFile + ", File Size : " + fileSize + ", Ignore Empty file : " + ignoreEmptyFile); + }else{ + log.info("Transferring from:" + sourceFile + " To: " + destinationFile + ", File Size : " + fileSize + ", Ignore Empty file : " + ignoreEmptyFile); + } + + if (fileSize == 0L && !ignoreEmptyFile){ + String error = "Input file is empty..."; + log.error(error); + throw new JSchException(error); + } + String initData = "C0644 " + fileSize + " " + fileName + "\n"; + assert dout != null; + dout.write(initData.getBytes()); + dout.flush(); + + // send '\0' to source + buf[0] = 0; + sout.write(buf, 0, 1); + sout.flush(); + + int rLength; + while (true) { + if (buf.length < fileSize) rLength = buf.length; + else rLength = (int) fileSize; + rLength = sin.read(buf, 0, rLength); // read content of the source File + if (rLength < 0) { + // error + break; + } + dout.write(buf, 0, rLength); // write to destination file + fileSize -= rLength; + if (fileSize == 0L) break; + } + + // send '\0' to target + buf[0] = 0; + dout.write(buf, 0, 1); + dout.flush(); + if (checkAck(din) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new Exception(error); + } + dout.close(); + dout = null; + + if (checkAck(sin) != 0) { + String error = "Error transfering the file content"; + log.error(error); + throw new Exception(error); + } + + // send '\0' + buf[0] = 0; + sout.write(buf, 0, 1); + sout.flush(); + } + + } catch (Exception e) { + log.error(e.getMessage(), e); + throw new JSchException(e.getMessage()); + } finally { + try { + if (dout != null) dout.close(); + } catch (Exception ee) { + log.error("", ee); + } + try { + if (din != null) din.close(); + } catch (Exception ee) { + log.error("", ee); + } + try { + if (sout != null) sout.close(); + } catch (Exception ee) { + log.error("", ee); + } + try { + if (din != null) din.close(); + } catch (Exception ee) { + log.error("", ee); + } + } + } + + public static void makeDirectory(String path, Session session) throws IOException, JSchException, WorkerException { + + // exec 'scp -t rfile' remotely + String command = "mkdir -p " + path; + Channel channel = session.openChannel("exec"); + StandardOutReader stdOutReader = new StandardOutReader(); + + ((ChannelExec) channel).setCommand(command); + + + ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); + try { + channel.connect(); + } catch (JSchException e) { + + channel.disconnect(); +// session.disconnect(); + log.error("Unable to retrieve command output. Command - " + command + + " on server - " + session.getHost() + ":" + session.getPort() + + " connecting user name - " + + session.getUserName()); + throw e; + } + stdOutReader.onOutput(channel); + if (stdOutReader.getStdErrorString().contains("mkdir:")) { + throw new WorkerException(stdOutReader.getStdErrorString()); + } + + channel.disconnect(); + } + + public static List<String> listDirectory(String path, Session session) throws IOException, JSchException, + WorkerException { + + // exec 'scp -t rfile' remotely + String command = "ls " + path; + Channel channel = session.openChannel("exec"); + StandardOutReader stdOutReader = new StandardOutReader(); + + ((ChannelExec) channel).setCommand(command); + + + ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); + try { + channel.connect(); + } catch (JSchException e) { + + channel.disconnect(); +// session.disconnect(); + + throw new WorkerException("Unable to retrieve command output. Command - " + command + + " on server - " + session.getHost() + ":" + session.getPort() + + " connecting user name - " + + session.getUserName(), e); + } + stdOutReader.onOutput(channel); + stdOutReader.getStdOutputString(); + if (stdOutReader.getStdErrorString().contains("ls:")) { + throw new WorkerException(stdOutReader.getStdErrorString()); + } + channel.disconnect(); + return Arrays.asList(stdOutReader.getStdOutputString().split("\n")); + } + + + static int checkAck(InputStream in) throws IOException { + int b = in.read(); + if (b == 0) return b; + if (b == -1) return b; + + if (b == 1 || b == 2) { + StringBuffer sb = new StringBuffer(); + int c; + do { + c = in.read(); + sb.append((char) c); + } + while (c != '\n'); + //FIXME: Redundant + if (b == 1) { // error + System.out.print(sb.toString()); + } + if (b == 2) { // fatal error + System.out.print(sb.toString()); + } + log.warn(sb.toString()); + } + return b; + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java ---------------------------------------------------------------------- diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java index 75a8062..b5ce833 100644 --- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java +++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java @@ -36,6 +36,7 @@ import org.apache.airavata.worker.core.authentication.SSHKeyAuthentication; import org.apache.airavata.worker.core.cluster.ServerInfo; import org.apache.airavata.worker.core.config.ResourceConfig; import org.apache.airavata.worker.core.config.WorkerYamlConfigruation; +import org.apache.airavata.worker.core.context.ProcessContext; import org.apache.airavata.worker.core.exceptions.WorkerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,6 +90,17 @@ public class WorkerFactory { return resources.get(resourceJobManagerType); } + public static SSHKeyAuthentication getStorageSSHKeyAuthentication(ProcessContext pc) + throws WorkerException, CredentialStoreException { + try { + return getSshKeyAuthentication(pc.getGatewayId(), + pc.getStorageResourceLoginUserName(), + pc.getStorageResourceCredentialToken()); + } catch (ApplicationSettingsException | IllegalAccessException | InstantiationException e) { + throw new WorkerException("Couldn't build ssh authentication object", e); + } + } + public static SSHKeyAuthentication getSshKeyAuthentication(String gatewayId, String loginUserName, String credentialStoreToken) http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java ---------------------------------------------------------------------- diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java index a53d736..663428a 100644 --- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java +++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java @@ -8,10 +8,14 @@ import org.apache.airavata.credential.store.store.CredentialReader; import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl; import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType; +import org.apache.airavata.model.application.io.OutputDataObjectType; import org.apache.airavata.model.commons.ErrorModel; +import org.apache.airavata.model.data.replica.*; +import org.apache.airavata.model.experiment.ExperimentModel; import org.apache.airavata.model.job.JobModel; import org.apache.airavata.model.messaging.event.*; import org.apache.airavata.model.status.*; +import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; import org.apache.airavata.registry.cpi.*; import org.apache.airavata.registry.cpi.utils.Constants; import org.apache.airavata.worker.core.context.ProcessContext; @@ -20,6 +24,9 @@ import org.apache.airavata.worker.core.exceptions.WorkerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -232,4 +239,61 @@ public class WorkerUtils { return null; } } + + public static void saveExperimentOutput(ProcessContext processContext, String outputName, String outputVal) throws WorkerException { + try { + ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog(); + String experimentId = processContext.getExperimentId(); + ExperimentModel experiment = (ExperimentModel)experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId); + List<OutputDataObjectType> experimentOutputs = experiment.getExperimentOutputs(); + if (experimentOutputs != null && !experimentOutputs.isEmpty()){ + for (OutputDataObjectType expOutput : experimentOutputs){ + if (expOutput.getName().equals(outputName)){ + DataProductModel dataProductModel = new DataProductModel(); + dataProductModel.setGatewayId(processContext.getGatewayId()); + dataProductModel.setOwnerName(processContext.getProcessModel().getUserName()); + dataProductModel.setProductName(outputName); + dataProductModel.setDataProductType(DataProductType.FILE); + + DataReplicaLocationModel replicaLocationModel = new DataReplicaLocationModel(); + replicaLocationModel.setStorageResourceId(processContext.getStorageResource().getStorageResourceId()); + replicaLocationModel.setReplicaName(outputName + " gateway data store copy"); + replicaLocationModel.setReplicaLocationCategory(ReplicaLocationCategory.GATEWAY_DATA_STORE); + replicaLocationModel.setReplicaPersistentType(ReplicaPersistentType.TRANSIENT); + replicaLocationModel.setFilePath(outputVal); + dataProductModel.addToReplicaLocations(replicaLocationModel); + + ReplicaCatalog replicaCatalog = RegistryFactory.getReplicaCatalog(); + String productUri = replicaCatalog.registerDataProduct(dataProductModel); + expOutput.setValue(productUri); + } + } + } + experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment, experimentId); + } catch (RegistryException e) { + String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId() + + " : - Error while updating experiment outputs"; + throw new WorkerException(msg, e); + } + } + + public static URI getDestinationURI(TaskContext taskContext, String hostName, String inputPath, String fileName) throws URISyntaxException { + String experimentDataDir = taskContext.getParentProcessContext().getProcessModel().getExperimentDataDir(); + String filePath; + if(experimentDataDir != null && !experimentDataDir.isEmpty()) { + if(!experimentDataDir.endsWith(File.separator)){ + experimentDataDir += File.separator; + } + if (experimentDataDir.startsWith(File.separator)) { + filePath = experimentDataDir + fileName; + } else { + filePath = inputPath + experimentDataDir + fileName; + } + } else { + filePath = inputPath + taskContext.getParentProcessContext().getProcessId() + File.separator + fileName; + } + //FIXME + return new URI("file", taskContext.getParentProcessContext().getStorageResourceLoginUserName(), hostName, 22, filePath, null, null); + + } }
