Repository: airavata
Updated Branches:
  refs/heads/feature-workload-mgmt 9f0e45b25 -> d231956e8


http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UNICORESecurityContext.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UNICORESecurityContext.java
 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UNICORESecurityContext.java
new file mode 100644
index 0000000..adacb21
--- /dev/null
+++ 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UNICORESecurityContext.java
@@ -0,0 +1,195 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import de.fzj.unicore.uas.security.ProxyCertOutHandler;
+import eu.emi.security.authn.x509.X509Credential;
+import eu.emi.security.authn.x509.impl.KeyAndCertCredential;
+import eu.emi.security.authn.x509.impl.X500NameUtils;
+import eu.unicore.util.httpclient.DefaultClientConfiguration;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.model.experiment.UserConfigurationDataModel;
+import org.apache.airavata.worker.core.RequestData;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.bouncycastle.asn1.x500.style.BCStyle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+public class UNICORESecurityContext extends X509SecurityContext {
+
+       private static final long serialVersionUID = 1L;
+       protected static final Logger log = 
LoggerFactory.getLogger(UNICORESecurityContext.class);
+       private DefaultClientConfiguration secProperties;
+       
+       
+       public UNICORESecurityContext(CredentialReader credentialReader, 
RequestData requestData) {
+               super(credentialReader, requestData);
+       }
+       
+       
+
+       public DefaultClientConfiguration getDefaultConfiguration(Boolean 
enableMessageLogging) throws WorkerException, ApplicationSettingsException {
+               try{
+                       X509Credential cred = getX509Credentials();
+                       secProperties = new 
DefaultClientConfiguration(dcValidator, cred);
+                       setExtraSettings();
+               }
+               catch (Exception e) {
+                       throw new WorkerException(e.getMessage(), e);
+               } 
+               if(enableMessageLogging) secProperties.setMessageLogging(true);
+               
+               return secProperties;
+       }
+
+       public DefaultClientConfiguration getDefaultConfiguration(Boolean 
enableMessageLogging,
+                                                                               
                                          UserConfigurationDataModel 
userDataModel)
+                       throws WorkerException, ApplicationSettingsException {
+
+               X509Credential cred = null;
+               try{
+                       boolean genCert = userDataModel.isGenerateCert();
+                               if(genCert) {
+                                       String userDN = 
userDataModel.getUserDN();
+                                       if (userDN == null || 
"".equals(userDN)){
+                                               log.warn("Cannot generate cert, 
falling back to GFAC configured MyProxy credentials");
+                                               return 
getDefaultConfiguration(enableMessageLogging);
+                                       }
+                                       else {
+                                               log.info("Generating X.509 
certificate for: "+userDN);
+                                               try {
+                                                       String caCertPath = 
ServerSettings.getSetting(BESConstants.PROP_CA_CERT_PATH, "");
+                                                       String caKeyPath = 
ServerSettings.getSetting(BESConstants.PROP_CA_KEY_PATH, "");
+                                                       String caKeyPass = 
ServerSettings.getSetting(BESConstants.PROP_CA_KEY_PASS, "");
+                                                       
+                                                       
if(caCertPath.equals("") || caKeyPath.equals("")) {
+                                                               throw new 
Exception("CA certificate or key file path missing in the properties file. "
+                                                                               
+ "Please make sure " + BESConstants.PROP_CA_CERT_PATH + " or "
+                                                                               
+ BESConstants.PROP_CA_KEY_PATH + " are not empty.");
+                                                       }
+                                                       
+                                                       
if("".equals(caKeyPass)) {
+                                                               
log.warn("Caution: CA key has no password. For security reasons it is highly 
recommended to set a CA key password");
+                                                       }
+                                                       cred = 
generateShortLivedCredential(userDN, caCertPath, caKeyPath, caKeyPass);
+                                               }catch (Exception e){
+                                                       throw new 
WorkerException("Error occured while generating a short lived credential for 
user:"+userDN, e);
+                                               }
+                                               
+                                       }
+                               }else  {
+                                       return 
getDefaultConfiguration(enableMessageLogging);
+                               }
+                               
+                       secProperties = new 
DefaultClientConfiguration(dcValidator, cred);
+                       setExtraSettings();
+               }
+               catch (Exception e) {
+                       throw new WorkerException(e.getMessage(), e);
+               } 
+               secProperties.getETDSettings().setExtendTrustDelegation(true);
+               if(enableMessageLogging) secProperties.setMessageLogging(true);
+//             secProperties.setDoSignMessage(true);
+               secProperties.getETDSettings()
+                               
.setIssuerCertificateChain(secProperties.getCredential().getCertificateChain());
+               
+               return secProperties;
+       }
+
+       
+       /**
+        * Get server signed credentials. Each time it is invoked new 
certificate 
+        * is returned.
+        * 
+        * @param userID
+        * @param userDN
+        * @param caCertPath
+        * @param caKeyPath
+        * @param caKeyPwd
+        * @return
+        * @throws WorkerException
+        */
+       public DefaultClientConfiguration getServerSignedConfiguration(String 
userID, String userDN, String caCertPath, String caKeyPath, String caKeyPwd) 
throws WorkerException {
+               try {
+                       KeyAndCertCredential cred = 
SecurityUtils.generateShortLivedCertificate(userDN,caCertPath,caKeyPath,caKeyPwd);
+                       secProperties = new 
DefaultClientConfiguration(dcValidator, cred);
+                       setExtraSettings();
+               } catch (Exception e) {
+                       throw new WorkerException(e.getMessage(), e);
+               }
+
+               return secProperties;
+       }
+
+       
+       
+
+       
+       private void setExtraSettings(){
+               secProperties.getETDSettings().setExtendTrustDelegation(true);
+
+               secProperties.setDoSignMessage(true);
+
+               String[] outHandlers = secProperties.getOutHandlerClassNames();
+               
+               Set<String> outHandlerLst = null;
+
+               // timeout in milliseconds
+               Properties p = secProperties.getExtraSettings();
+               
+               if(p == null) {
+                       p = new Properties();
+               }
+               
+               p.setProperty("http.connection.timeout", "5000");
+               p.setProperty("http.socket.timeout", "5000");
+               
+               if (outHandlers == null) {
+                       outHandlerLst = new HashSet<String>();
+               } else {
+                       outHandlerLst = new 
HashSet<String>(Arrays.asList(outHandlers));
+               }
+
+               outHandlerLst.add(ProxyCertOutHandler.class.getName());
+               
+               secProperties.setOutHandlerClassNames(outHandlerLst
+                               .toArray(new String[outHandlerLst.size()]));
+               
+               secProperties.getETDSettings().setExtendTrustDelegation(true);
+
+       }
+
+
+    private String getCNFromUserDN(String userDN) {
+        return X500NameUtils.getAttributeValues(userDN, BCStyle.CN)[0];
+
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/URIUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/URIUtils.java
 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/URIUtils.java
new file mode 100644
index 0000000..1f83370
--- /dev/null
+++ 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/URIUtils.java
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+
+import org.apache.commons.httpclient.URI;
+import org.apache.commons.httpclient.URIException;
+import org.apache.commons.httpclient.util.URIUtil;
+
+import java.net.URISyntaxException;
+
+public class URIUtils {
+
+       public static String encodeAll(String uri) throws URIException
+       {
+               String result = encodeAuthority(uri);
+               result = encodePath(uri);
+               result = encodeQuery(result );
+               result  = encodeFragment(result );
+               return result;
+       }
+       
+       public static String encodeAuthority(String uri) throws URIException
+       {
+               int start = uri.indexOf("//");
+               if(start == -1) return uri;
+               start++;
+               int end = uri.indexOf("/",start+1);
+               if(end == -1) end = uri.indexOf("?",start+1);
+               if(end == -1) end = uri.indexOf("#",start+1);
+               if(end == -1) end = uri.length();
+               String before = uri.substring(0, start+1);
+               String authority= uri.substring(start+1,end);
+               String after = uri.substring(end);
+               authority = URIUtil.encode(authority, URI.allowed_authority);
+       
+               return before+authority+after;
+       }
+       
+       public static String encodePath(String uri) throws URIException
+       {
+               int doubleSlashIndex = uri.indexOf("//");
+               boolean hasAuthority =  doubleSlashIndex >= 0;
+               int start = -1;
+               if(hasAuthority)
+               {
+                       start = uri.indexOf("/",doubleSlashIndex+2);
+               }
+               else
+               {
+                       start = uri.indexOf(":");
+               }
+               if(start == -1) return uri;
+               
+               int end = uri.indexOf("?",start+1);
+               if(end == -1) end = uri.indexOf("#",start+1);
+               if(end == -1) end = uri.length();
+               String before = uri.substring(0, start+1);
+               String path= uri.substring(start+1,end);
+               String after = uri.substring(end);
+               path = URIUtil.encode(path, URI.allowed_abs_path);
+               return before+path+after;
+       }
+       
+       
+       public static String encodeQuery(String uri) throws URIException
+       {
+               int queryStart = uri.indexOf("?");
+               if(queryStart == -1) return uri;
+               int queryEnd = uri.indexOf("#");
+               if(queryEnd == -1) queryEnd = uri.length();
+               
+               String beforeQuery = uri.substring(0, queryStart+1);
+               String query = uri.substring(queryStart+1,queryEnd);
+               String afterQuery = uri.substring(queryEnd);
+               query = URIUtil.encode(query, URI.allowed_query);
+               return beforeQuery+query+afterQuery;
+       }
+       
+       
+       public static String encodeFragment(String uri) throws URIException
+       {
+               int fragmentStart = uri.indexOf("#");
+               if(fragmentStart == -1) return uri;
+
+               String beforeFragment = uri.substring(0, fragmentStart+1);
+               String fragment = uri.substring(fragmentStart+1);
+               fragment = URIUtil.encode(fragment, URI.allowed_fragment);
+               return beforeFragment+fragment;
+       }
+    
+       public static java.net.URI createGsiftpURI(String host, String 
localPath) throws URISyntaxException {
+        StringBuffer buf = new StringBuffer();
+        if (!host.startsWith("gsiftp://";))
+            buf.append("gsiftp://";);
+        buf.append(host);
+        if (!host.endsWith("/"))
+            buf.append("/");
+        buf.append(localPath);
+        return new java.net.URI(buf.toString());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/X509SecurityContext.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/X509SecurityContext.java
 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/X509SecurityContext.java
new file mode 100644
index 0000000..5be5bc1
--- /dev/null
+++ 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/X509SecurityContext.java
@@ -0,0 +1,340 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+
+import eu.emi.security.authn.x509.X509Credential;
+import eu.emi.security.authn.x509.helpers.CertificateHelpers;
+import eu.emi.security.authn.x509.helpers.proxy.X509v3CertificateBuilder;
+import eu.emi.security.authn.x509.impl.CertificateUtils;
+import eu.emi.security.authn.x509.impl.CertificateUtils.Encoding;
+import eu.emi.security.authn.x509.impl.DirectoryCertChainValidator;
+import eu.emi.security.authn.x509.impl.KeyAndCertCredential;
+import eu.emi.security.authn.x509.impl.X500NameUtils;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.credential.Credential;
+import 
org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.worker.core.RequestData;
+import org.apache.airavata.worker.core.context.AbstractSecurityContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import 
org.apache.cxf.interceptor.security.AbstractSecurityContextInInterceptor;
+import org.bouncycastle.asn1.ASN1InputStream;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x500.style.BCStyle;
+import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
+import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.x500.X500Principal;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.security.InvalidKeyException;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.PrivateKey;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Handles X509 Certificate based security.
+ */
+public class X509SecurityContext extends AbstractSecurityContext {
+
+       private static final long serialVersionUID = 1L;
+
+       protected static final Logger log = 
LoggerFactory.getLogger(X509SecurityContext.class);
+    
+    /*
+     * context name
+     */
+    public static final String X509_SECURITY_CONTEXT = "x509.security.context";
+    
+    public static final int CREDENTIAL_RENEWING_THRESH_HOLD = 10 * 90;
+    
+    protected static DirectoryCertChainValidator dcValidator;
+    
+    private X509Credential x509Credentials= null;
+
+    static {
+        try {
+                       setUpTrustedCertificatePath();
+                       // set up directory based trust validator
+                       dcValidator = getTrustedCerts();
+               } catch (Exception e) {
+                       log.error(e.getLocalizedMessage(), e);
+               }
+    }
+    
+
+    public static void setUpTrustedCertificatePath(String 
trustedCertificatePath) {
+
+        File file = new File(trustedCertificatePath);
+
+        if (!file.exists() || !file.canRead()) {
+            File f = new File(".");
+            log.info("Current directory " + f.getAbsolutePath());
+            throw new RuntimeException("Cannot read trusted certificate path " 
+ trustedCertificatePath);
+        } else {
+            System.setProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY, 
file.getAbsolutePath());
+        }
+    }
+
+    private static void setUpTrustedCertificatePath() throws 
ApplicationSettingsException {
+
+        String trustedCertificatePath  = 
ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION);
+
+        setUpTrustedCertificatePath(trustedCertificatePath);
+    }
+
+    /**
+     * Gets the trusted certificate path. Trusted certificate path is stored 
in "X509_CERT_DIR"
+     * system property.
+     * @return The trusted certificate path as a string.
+     */
+    public static String getTrustedCertificatePath() {
+        return 
System.getProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY);
+    }
+
+
+    public X509SecurityContext(CredentialReader credentialReader, RequestData 
requestData) {
+        super(credentialReader, requestData);
+    }
+
+
+    /**
+     * Gets X509Credentials. The process is as follows;
+     * If credentials were queried for the first time create credentials.
+     *   1. Try creating credentials using certificates stored in the 
credential store
+     *   2. If 1 fails use user name and password to create credentials
+     * @return x509credentials (from CANL security API)
+     * @throws ApplicationSettingsException
+     */
+    public X509Credential getX509Credentials() throws WorkerException, 
ApplicationSettingsException {
+
+       if(getCredentialReader() == null) {
+               return getDefaultCredentials();
+       }
+
+        if (x509Credentials == null) {
+
+            try {
+                x509Credentials = getCredentialsFromStore();
+            } catch (Exception e) {
+                log.error("An exception occurred while retrieving credentials 
from the credential store. " +
+                        "Will continue with my proxy user name and password.", 
e);
+            }
+
+            // If store does not have credentials try to get from user name 
and password
+            if (x509Credentials == null) {
+                x509Credentials = getDefaultCredentials();
+            }
+
+            // if still null, throw an exception
+            if (x509Credentials == null) {
+                throw new WorkerException("Unable to retrieve my proxy 
credentials to continue operation.");
+            }
+        } else {
+            try {
+
+               final long remainingTime = 
x509Credentials.getCertificate().getNotAfter().getTime() - new Date().getTime();
+
+                if (remainingTime < CREDENTIAL_RENEWING_THRESH_HOLD) {
+                       //                    return renewCredentials();
+                       log.warn("Do not support credentials renewal");
+                }
+
+                log.info("Fall back to get new default credentials");
+
+                try {
+                       x509Credentials.getCertificate().checkValidity();
+                }catch(Exception e){
+                       x509Credentials = getDefaultCredentials();
+                }
+
+            } catch (Exception e) {
+                throw new WorkerException("Unable to retrieve remaining life 
time from credentials.", e);
+            }
+        }
+
+        return x509Credentials;
+    }
+
+        /**
+     * Reads the credentials from credential store.
+     * @return If token is found in the credential store, will return a valid 
credential. Else returns null.
+     * @throws Exception If an error occurred while retrieving credentials.
+     */
+    public  X509Credential getCredentialsFromStore() throws Exception {
+
+        if (getCredentialReader() == null) {
+            return null;
+        }
+
+        Credential credential = 
getCredentialReader().getCredential(getRequestData().getGatewayId(),
+                getRequestData().getTokenId());
+
+        if (credential != null) {
+            if (credential instanceof CertificateCredential) {
+
+                log.info("Successfully found credentials for token id - " + 
getRequestData().getTokenId() +
+                        " gateway id - " + getRequestData().getGatewayId());
+
+                CertificateCredential certificateCredential = 
(CertificateCredential) credential;
+
+                X509Certificate[] certificates = 
certificateCredential.getCertificates();
+
+                KeyAndCertCredential keyAndCert = new 
KeyAndCertCredential(certificateCredential.getPrivateKey(), certificates);
+
+                return keyAndCert;
+                //return new GlobusGSSCredentialImpl(newCredential,
+                //        GSSCredential.INITIATE_AND_ACCEPT);
+            } else {
+                log.info("Credential type is not CertificateCredential. Cannot 
create mapping globus credentials. " +
+                        "Credential type - " + 
credential.getClass().getName());
+            }
+        } else {
+            log.info("Could not find credentials for token - " + 
getRequestData().getTokenId() + " and "
+                    + "gateway id - " + getRequestData().getGatewayId());
+        }
+
+        return null;
+    }
+
+    /**
+     * Gets the default proxy certificate.
+     * @return Default my proxy credentials.
+     * @throws ApplicationSettingsException
+     */
+    public X509Credential getDefaultCredentials() throws WorkerException, 
ApplicationSettingsException {
+       MyProxyLogon logon = new MyProxyLogon();
+       logon.setValidator(dcValidator);
+       logon.setHost(getRequestData().getMyProxyServerUrl());
+       logon.setPort(getRequestData().getMyProxyPort());
+       logon.setUsername(getRequestData().getMyProxyUserName());
+       
logon.setPassphrase(getRequestData().getMyProxyPassword().toCharArray());
+       logon.setLifetime(getRequestData().getMyProxyLifeTime());
+       
+               try {
+                               logon.connect();
+                               logon.logon();
+                               logon.getCredentials();
+                               logon.disconnect();
+                               PrivateKey pk=logon.getPrivateKey();
+                               return new KeyAndCertCredential(pk, new 
X509Certificate[]{logon.getCertificate()});
+                       }  catch (Exception e) {
+                               throw new WorkerException("An error occurred 
while retrieving default security credentials.", e);
+                       }
+    }
+
+       private static  DirectoryCertChainValidator getTrustedCerts() throws 
Exception{
+               String certLocation = getTrustedCertificatePath();
+               List<String> trustedCert = new ArrayList<String>();
+               trustedCert.add(certLocation + "/*.0");
+               trustedCert.add(certLocation + "/*.pem");
+               DirectoryCertChainValidator dcValidator = new 
DirectoryCertChainValidator(trustedCert, Encoding.PEM, -1, 60000, null);
+               return dcValidator;
+       }
+       
+       private String getCNFromUserDN(String userDN) {
+               return X500NameUtils.getAttributeValues(userDN, BCStyle.CN)[0];
+       }
+       
+       public KeyAndCertCredential generateShortLivedCredential(String userDN, 
String caCertPath, String caKeyPath,
+                       String caPwd) throws Exception {
+               final long CredentialGoodFromOffset = 1000L * 60L * 15L; // 15 
minutes
+                                                                               
                                                        // ago
+
+               final long startTime = System.currentTimeMillis() - 
CredentialGoodFromOffset;
+               final long endTime = startTime + 30 * 3600 * 1000;
+
+               String keyLengthProp = "1024";
+               int keyLength = Integer.parseInt(keyLengthProp);
+               String signatureAlgorithm = "SHA1withRSA";
+
+               KeyAndCertCredential caCred = getCACredential(caCertPath, 
caKeyPath, caPwd);
+
+               KeyPairGenerator kpg = 
KeyPairGenerator.getInstance(caCred.getKey().getAlgorithm());
+               kpg.initialize(keyLength);
+               KeyPair pair = kpg.generateKeyPair();
+
+               X500Principal subjectDN = new X500Principal(userDN);
+               Random rand = new Random();
+
+               SubjectPublicKeyInfo publicKeyInfo;
+               try {
+                       publicKeyInfo = SubjectPublicKeyInfo.getInstance(new 
ASN1InputStream(pair.getPublic().getEncoded())
+                                       .readObject());
+               } catch (IOException e) {
+                       throw new InvalidKeyException("Can not parse the public 
key"
+                                       + "being included in the short lived 
certificate", e);
+               }
+
+               X500Name issuerX500Name = 
CertificateHelpers.toX500Name(caCred.getCertificate().getSubjectX500Principal());
+
+               X500Name subjectX500Name = 
CertificateHelpers.toX500Name(subjectDN);
+
+               X509v3CertificateBuilder certBuilder = new 
X509v3CertificateBuilder(issuerX500Name, new BigInteger(20, rand),
+                               new Date(startTime), new Date(endTime), 
subjectX500Name, publicKeyInfo);
+
+               AlgorithmIdentifier sigAlgId = 
X509v3CertificateBuilder.extractAlgorithmId(caCred.getCertificate());
+
+               X509Certificate certificate = 
certBuilder.build(caCred.getKey(), sigAlgId, signatureAlgorithm, null, null);
+
+               certificate.checkValidity(new Date());
+               certificate.verify(caCred.getCertificate().getPublicKey());
+               KeyAndCertCredential result = new 
KeyAndCertCredential(pair.getPrivate(), new X509Certificate[] { certificate,
+                               caCred.getCertificate() });
+
+               return result;
+       }
+       
+       private KeyAndCertCredential getCACredential(String caCertPath, String 
caKeyPath, String password) throws Exception {
+               InputStream isKey, isCert;
+               isKey = isCert = null;
+               try {
+               isKey = new FileInputStream(caKeyPath);
+               PrivateKey pk = CertificateUtils.loadPrivateKey(isKey, 
Encoding.PEM, password.toCharArray());
+
+               isCert = new FileInputStream(caCertPath);
+               X509Certificate caCert = 
CertificateUtils.loadCertificate(isCert, Encoding.PEM);
+               return new KeyAndCertCredential(pk, new X509Certificate[] { 
caCert });
+               } finally {
+                       if (isKey != null){
+                               isKey.close();
+                       }
+                       if (isCert != null) {
+                               isCert.close();
+                       }
+               }
+       }
+        
+    
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/pom.xml 
b/modules/worker/worker-core/pom.xml
index 8f00821..32d37d1 100644
--- a/modules/worker/worker-core/pom.xml
+++ b/modules/worker/worker-core/pom.xml
@@ -63,6 +63,12 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
+        <!-- zookeeper dependencies -->
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/RequestData.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/RequestData.java
 
b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/RequestData.java
new file mode 100644
index 0000000..8a961c5
--- /dev/null
+++ 
b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/RequestData.java
@@ -0,0 +1,149 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.core;
+
+/**
+ * User: AmilaJ ([email protected])
+ * Date: 6/28/13
+ * Time: 3:28 PM
+ */
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.worker.core.utils.WorkerConstants;
+
+/**
+ * Encapsulates GFac specific data that are coming in the request.
+ */
+public class RequestData {
+
+    private static final int DEFAULT_LIFE_TIME = 3600;
+    private static final int DEFAULT_MY_PROXY_PORT = 7512;
+
+    private String tokenId;
+    private String requestUser;
+    private String gatewayId;
+
+    private String myProxyServerUrl = null;
+    private int myProxyPort = 0;
+    private String myProxyUserName = null;
+    private String myProxyPassword = null;
+    private int myProxyLifeTime = DEFAULT_LIFE_TIME;
+
+
+
+
+    public RequestData() {
+    }
+
+    public RequestData(String gatewayId) {
+        this.gatewayId = gatewayId;
+    }
+
+    public RequestData(String tokenId, String requestUser, String gatewayId) {
+        this.tokenId = tokenId;
+        this.requestUser = requestUser;
+        this.gatewayId = gatewayId;
+    }
+
+    public String getTokenId() {
+        return tokenId;
+    }
+
+    public void setTokenId(String tokenId) {
+        this.tokenId = tokenId;
+    }
+
+    public String getRequestUser() {
+        return requestUser;
+    }
+
+    public void setRequestUser(String requestUser) {
+        this.requestUser = requestUser;
+    }
+
+    public String getGatewayId() {
+        return gatewayId;
+    }
+
+    public void setGatewayId(String gatewayId) {
+        this.gatewayId = gatewayId;
+    }
+
+    public String getMyProxyServerUrl() throws ApplicationSettingsException {
+        if (myProxyServerUrl == null) {
+            myProxyServerUrl = 
ServerSettings.getSetting(WorkerConstants.MYPROXY_SERVER);
+        }
+        return myProxyServerUrl;
+    }
+
+    public void setMyProxyServerUrl(String myProxyServerUrl) {
+        this.myProxyServerUrl = myProxyServerUrl;
+    }
+
+    public int getMyProxyPort() {
+
+        if (myProxyPort == 0) {
+            String sPort = 
ServerSettings.getSetting(WorkerConstants.MYPROXY_SERVER_PORT, 
Integer.toString(DEFAULT_MY_PROXY_PORT));
+            myProxyPort = Integer.parseInt(sPort);
+        }
+
+        return myProxyPort;
+    }
+
+    public void setMyProxyPort(int myProxyPort) {
+        this.myProxyPort = myProxyPort;
+    }
+
+    public String getMyProxyUserName() throws ApplicationSettingsException {
+        if (myProxyUserName == null) {
+            myProxyUserName = 
ServerSettings.getSetting(WorkerConstants.MYPROXY_USER);
+        }
+
+        return myProxyUserName;
+    }
+
+    public void setMyProxyUserName(String myProxyUserName) {
+        this.myProxyUserName = myProxyUserName;
+    }
+
+    public String getMyProxyPassword() throws ApplicationSettingsException {
+
+        if (myProxyPassword == null) {
+            myProxyPassword = 
ServerSettings.getSetting(WorkerConstants.MYPROXY_PASS);
+        }
+
+        return myProxyPassword;
+    }
+
+    public int getMyProxyLifeTime() {
+        String life = 
ServerSettings.getSetting(WorkerConstants.MYPROXY_LIFE,Integer.toString(myProxyLifeTime));
+        myProxyLifeTime = Integer.parseInt(life);
+        return myProxyLifeTime;
+    }
+
+    public void setMyProxyLifeTime(int myProxyLifeTime) {
+        this.myProxyLifeTime = myProxyLifeTime;
+    }
+
+    public void setMyProxyPassword(String myProxyPassword) {
+        this.myProxyPassword = myProxyPassword;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/SecurityContext.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/SecurityContext.java
 
b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/SecurityContext.java
new file mode 100644
index 0000000..aec1d83
--- /dev/null
+++ 
b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/SecurityContext.java
@@ -0,0 +1,24 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.core;
+
+public interface SecurityContext {
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/context/AbstractSecurityContext.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/context/AbstractSecurityContext.java
 
b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/context/AbstractSecurityContext.java
new file mode 100644
index 0000000..c4798e2
--- /dev/null
+++ 
b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/context/AbstractSecurityContext.java
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.core.context;
+
+/**
+ * User: AmilaJ ([email protected])
+ * Date: 6/26/13
+ * Time: 4:33 PM
+ */
+
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.worker.core.RequestData;
+import org.apache.airavata.worker.core.SecurityContext;
+
+import java.io.Serializable;
+
+/**
+ * Abstract implementation of the security context.
+ */
+public abstract class AbstractSecurityContext implements SecurityContext, 
Serializable {
+
+    private CredentialReader credentialReader;
+    private RequestData requestData;
+
+    public AbstractSecurityContext(CredentialReader credentialReader, 
RequestData requestData) {
+        this.credentialReader = credentialReader;
+        this.requestData = requestData;
+    }
+    public AbstractSecurityContext() {
+
+    }
+
+    public CredentialReader getCredentialReader() {
+        return credentialReader;
+    }
+
+    public RequestData getRequestData() {
+        return requestData;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/SSHUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/SSHUtils.java
 
b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/SSHUtils.java
new file mode 100644
index 0000000..705b379
--- /dev/null
+++ 
b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/SSHUtils.java
@@ -0,0 +1,524 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.core.utils;
+
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.worker.core.exceptions.SSHApiException;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Utility class to do all ssh and scp related things.
+ */
+public class SSHUtils {
+       private static final Logger log = 
LoggerFactory.getLogger(SSHUtils.class);
+
+
+       /**
+        * This will copy a local file to a remote location
+        *
+        * @param remoteFile remote location you want to transfer the file, 
this cannot be a directory, if user pass
+        *                   a dirctory we do copy it to that directory but we 
simply return the directory name
+        *                   todo handle the directory name as input and return 
the proper final output file name
+        * @param localFile  Local file to transfer, this can be a directory
+        * @return returns the final remote file path, so that users can use 
the new file location
+        */
+       public static String scpTo(String localFile, String remoteFile, Session 
session) throws IOException,
+                       JSchException, SSHApiException {
+               FileInputStream fis = null;
+               String prefix = null;
+               if (new File(localFile).isDirectory()) {
+                       prefix = localFile + File.separator;
+               }
+               boolean ptimestamp = true;
+
+               // exec 'scp -t rfile' remotely
+               String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + 
remoteFile;
+               Channel channel = session.openChannel("exec");
+
+               StandardOutReader stdOutReader = new StandardOutReader();
+               ((ChannelExec) 
channel).setErrStream(stdOutReader.getStandardError());
+               ((ChannelExec) channel).setCommand(command);
+
+               // get I/O streams for remote scp
+               OutputStream out = channel.getOutputStream();
+               InputStream in = channel.getInputStream();
+
+               channel.connect();
+
+               if (checkAck(in) != 0) {
+                       String error = "Error Reading input Stream";
+                       log.error(error);
+                       throw new SSHApiException(error);
+               }
+
+               File _lfile = new File(localFile);
+
+               if (ptimestamp) {
+                       command = "T" + (_lfile.lastModified() / 1000) + " 0";
+                       // The access time should be sent here,
+                       // but it is not accessible with JavaAPI ;-<
+                       command += (" " + (_lfile.lastModified() / 1000) + " 
0\n");
+                       out.write(command.getBytes());
+                       out.flush();
+                       if (checkAck(in) != 0) {
+                               String error = "Error Reading input Stream";
+                               log.error(error);
+                               throw new SSHApiException(error);
+                       }
+               }
+
+               // send "C0644 filesize filename", where filename should not 
include '/'
+               long filesize = _lfile.length();
+               command = "C0644 " + filesize + " ";
+               if (localFile.lastIndexOf('/') > 0) {
+                       command += 
localFile.substring(localFile.lastIndexOf('/') + 1);
+               } else {
+                       command += localFile;
+               }
+               command += "\n";
+               out.write(command.getBytes());
+               out.flush();
+               if (checkAck(in) != 0) {
+                       String error = "Error Reading input Stream";
+                       log.error(error);
+                       throw new SSHApiException(error);
+               }
+
+               // send a content of localFile
+               fis = new FileInputStream(localFile);
+               byte[] buf = new byte[1024];
+               while (true) {
+                       int len = fis.read(buf, 0, buf.length);
+                       if (len <= 0) break;
+                       out.write(buf, 0, len); //out.flush();
+               }
+               fis.close();
+               fis = null;
+               // send '\0'
+               buf[0] = 0;
+               out.write(buf, 0, 1);
+               out.flush();
+               if (checkAck(in) != 0) {
+                       String error = "Error Reading input Stream";
+                       log.error(error);
+                       throw new SSHApiException(error);
+               }
+               out.close();
+               stdOutReader.onOutput(channel);
+
+
+               channel.disconnect();
+               if (stdOutReader.getStdErrorString().contains("scp:")) {
+                       throw new 
SSHApiException(stdOutReader.getStdErrorString());
+               }
+               //since remote file is always a file  we just return the file
+               return remoteFile;
+       }
+
+       /**
+        * This method will copy a remote file to a local directory
+        *
+        * @param remoteFile remote file path, this has to be a full qualified 
path
+        * @param localFile  This is the local file to copy, this can be a 
directory too
+        * @return returns the final local file path of the new file came from 
the remote resource
+        */
+       public static void scpFrom(String remoteFile, String localFile, Session 
session) throws IOException,
+                       JSchException, SSHApiException {
+               FileOutputStream fos = null;
+               try {
+                       String prefix = null;
+                       if (new File(localFile).isDirectory()) {
+                               prefix = localFile + File.separator;
+                       }
+
+                       // exec 'scp -f remotefile' remotely
+                       String command = "scp -f " + remoteFile;
+                       Channel channel = session.openChannel("exec");
+                       ((ChannelExec) channel).setCommand(command);
+
+                       StandardOutReader stdOutReader = new 
StandardOutReader();
+                       ((ChannelExec) 
channel).setErrStream(stdOutReader.getStandardError());
+                       // get I/O streams for remote scp
+                       OutputStream out = channel.getOutputStream();
+                       InputStream in = channel.getInputStream();
+
+            if (!channel.isClosed()){
+                channel.connect();
+            }
+
+                       byte[] buf = new byte[1024];
+
+                       // send '\0'
+                       buf[0] = 0;
+                       out.write(buf, 0, 1);
+                       out.flush();
+
+                       while (true) {
+                               int c = checkAck(in);
+                               if (c != 'C') {
+                                       break;
+                               }
+
+                               // read '0644 '
+                               in.read(buf, 0, 5);
+
+                               long filesize = 0L;
+                               while (true) {
+                                       if (in.read(buf, 0, 1) < 0) {
+                                               // error
+                                               break;
+                                       }
+                                       if (buf[0] == ' ') break;
+                                       filesize = filesize * 10L + (long) 
(buf[0] - '0');
+                               }
+
+                               String file = null;
+                               for (int i = 0; ; i++) {
+                                       in.read(buf, i, 1);
+                                       if (buf[i] == (byte) 0x0a) {
+                                               file = new String(buf, 0, i);
+                                               break;
+                                       }
+                               }
+
+                               //System.out.println("filesize="+filesize+", 
file="+file);
+
+                               // send '\0'
+                               buf[0] = 0;
+                               out.write(buf, 0, 1);
+                               out.flush();
+
+                               // read a content of lfile
+                               fos = new FileOutputStream(prefix == null ? 
localFile : prefix + file);
+                               int foo;
+                               while (true) {
+                                       if (buf.length < filesize) foo = 
buf.length;
+                                       else foo = (int) filesize;
+                                       foo = in.read(buf, 0, foo);
+                                       if (foo < 0) {
+                                               // error
+                                               break;
+                                       }
+                                       fos.write(buf, 0, foo);
+                                       filesize -= foo;
+                                       if (filesize == 0L) break;
+                               }
+                               fos.close();
+                               fos = null;
+
+                               if (checkAck(in) != 0) {
+                                       String error = "Error transfering the 
file content";
+                                       log.error(error);
+                                       throw new SSHApiException(error);
+                               }
+
+                               // send '\0'
+                               buf[0] = 0;
+                               out.write(buf, 0, 1);
+                               out.flush();
+                       }
+                       stdOutReader.onOutput(channel);
+                       if (stdOutReader.getStdErrorString().contains("scp:")) {
+                               throw new 
SSHApiException(stdOutReader.getStdErrorString());
+                       }
+
+               } catch (Exception e) {
+                       log.error(e.getMessage(), e);
+               } finally {
+                       try {
+                               if (fos != null) fos.close();
+                       } catch (Exception ee) {
+                       }
+               }
+       }
+
+    /**
+     * This method will copy a remote file to a local directory
+     *
+     * @param sourceFile remote file path, this has to be a full qualified path
+     * @param sourceSession JSch session for source
+     * @param destinationFile This is the local file to copy, this can be a 
directory too
+     * @param destinationSession JSch Session for target
+     * @return returns the final local file path of the new file came from the 
remote resource
+     */
+    public static void scpThirdParty(String sourceFile, Session sourceSession, 
String destinationFile, Session destinationSession, boolean ignoreEmptyFile) 
throws
+            IOException, JSchException {
+        OutputStream sout = null;
+        InputStream sin = null;
+        OutputStream dout = null;
+        InputStream din = null;
+        try {
+            String prefix = null;
+
+            // exec 'scp -f sourceFile'
+            String sourceCommand = "scp -f " + sourceFile;
+            Channel sourceChannel = sourceSession.openChannel("exec");
+            ((ChannelExec) sourceChannel).setCommand(sourceCommand);
+            StandardOutReader sourceStdOutReader = new StandardOutReader();
+            ((ChannelExec) 
sourceChannel).setErrStream(sourceStdOutReader.getStandardError());
+            // get I/O streams for remote scp
+            sout = sourceChannel.getOutputStream();
+            sin = sourceChannel.getInputStream();
+            sourceChannel.connect();
+
+            boolean ptimestamp = true;
+            // exec 'scp -t destinationFile'
+            String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + 
destinationFile;
+            Channel targetChannel = destinationSession.openChannel("exec");
+            StandardOutReader targetStdOutReader = new StandardOutReader();
+            ((ChannelExec) 
targetChannel).setErrStream(targetStdOutReader.getStandardError());
+            ((ChannelExec) targetChannel).setCommand(command);
+            // get I/O streams for remote scp
+            dout = targetChannel.getOutputStream();
+            din = targetChannel.getInputStream();
+            targetChannel.connect();
+
+            if (checkAck(din) != 0) {
+                String error = "Error Reading input Stream";
+                log.error(error);
+                throw new Exception(error);
+            }
+
+
+            byte[] buf = new byte[1024];
+
+            // send '\0'
+            buf[0] = 0;
+            sout.write(buf, 0, 1);
+            sout.flush();
+
+                       log.info("Initiating transfer from:" + sourceFile + " 
To: " + destinationFile + ", Ignore Empty file : " + ignoreEmptyFile);
+
+            while (true) {
+                int c = checkAck(sin);
+                if (c != 'C') {
+                    break;
+                }
+
+                // read '0644 '
+                sin.read(buf, 0, 5);
+
+                long fileSize = 0L;
+                while (true) {
+                    if (sin.read(buf, 0, 1) < 0) {
+                        // error
+                        break;
+                    }
+                    if (buf[0] == ' ') break;
+                    fileSize = fileSize * 10L + (long) (buf[0] - '0');
+                }
+
+                String fileName = null;
+                for (int i = 0; ; i++) {
+                    sin.read(buf, i, 1);
+                    if (buf[i] == (byte) 0x0a) {
+                        fileName = new String(buf, 0, i);
+                        break;
+                    }
+                }
+
+                               //FIXME: Remove me after fixing file transfer 
issue
+                               if(fileSize == 0L){
+                                       log.warn("*****Zero byte file*****. 
Transferring from:" + sourceFile + " To: " + destinationFile + ", File Size : " 
+ fileSize + ", Ignore Empty file : " + ignoreEmptyFile);
+                               }else{
+                                       log.info("Transferring from:" + 
sourceFile + " To: " + destinationFile + ", File Size : " + fileSize + ", 
Ignore Empty file : " + ignoreEmptyFile);
+                               }
+
+                if (fileSize == 0L && !ignoreEmptyFile){
+                    String error = "Input file is empty...";
+                    log.error(error);
+                    throw new JSchException(error);
+                }
+                String initData = "C0644 " + fileSize + " " + fileName + "\n";
+                assert dout != null;
+                dout.write(initData.getBytes());
+                dout.flush();
+
+                // send '\0' to source
+                buf[0] = 0;
+                sout.write(buf, 0, 1);
+                sout.flush();
+
+                int rLength;
+                while (true) {
+                    if (buf.length < fileSize) rLength = buf.length;
+                    else rLength = (int) fileSize;
+                    rLength = sin.read(buf, 0, rLength); // read content of 
the source File
+                    if (rLength < 0) {
+                        // error
+                        break;
+                    }
+                    dout.write(buf, 0, rLength); // write to destination file
+                    fileSize -= rLength;
+                    if (fileSize == 0L) break;
+                }
+
+                // send '\0' to target
+                buf[0] = 0;
+                dout.write(buf, 0, 1);
+                dout.flush();
+                if (checkAck(din) != 0) {
+                    String error = "Error Reading input Stream";
+                    log.error(error);
+                    throw new Exception(error);
+                }
+                dout.close();
+                dout = null;
+
+                if (checkAck(sin) != 0) {
+                    String error = "Error transfering the file content";
+                    log.error(error);
+                    throw new Exception(error);
+                }
+
+                // send '\0'
+                buf[0] = 0;
+                sout.write(buf, 0, 1);
+                sout.flush();
+            }
+
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+            throw new JSchException(e.getMessage());
+        } finally {
+            try {
+                if (dout != null) dout.close();
+            } catch (Exception ee) {
+                log.error("", ee);
+            }
+            try {
+                if (din != null) din.close();
+            } catch (Exception ee) {
+                log.error("", ee);
+            }
+            try {
+                if (sout != null) sout.close();
+            } catch (Exception ee) {
+                log.error("", ee);
+            }
+            try {
+                if (din != null) din.close();
+            } catch (Exception ee) {
+                log.error("", ee);
+            }
+        }
+    }
+
+       public static void makeDirectory(String path, Session session) throws 
IOException, JSchException, WorkerException {
+
+               // exec 'scp -t rfile' remotely
+               String command = "mkdir -p " + path;
+               Channel channel = session.openChannel("exec");
+               StandardOutReader stdOutReader = new StandardOutReader();
+
+               ((ChannelExec) channel).setCommand(command);
+
+
+               ((ChannelExec) 
channel).setErrStream(stdOutReader.getStandardError());
+               try {
+                       channel.connect();
+               } catch (JSchException e) {
+
+                       channel.disconnect();
+//            session.disconnect();
+                       log.error("Unable to retrieve command output. Command - 
" + command +
+                                       " on server - " + session.getHost() + 
":" + session.getPort() +
+                                       " connecting user name - "
+                                       + session.getUserName());
+                       throw e;
+               }
+               stdOutReader.onOutput(channel);
+               if (stdOutReader.getStdErrorString().contains("mkdir:")) {
+                       throw new 
WorkerException(stdOutReader.getStdErrorString());
+               }
+
+               channel.disconnect();
+       }
+
+       public static List<String> listDirectory(String path, Session session) 
throws IOException, JSchException,
+                       WorkerException {
+
+               // exec 'scp -t rfile' remotely
+               String command = "ls " + path;
+               Channel channel = session.openChannel("exec");
+               StandardOutReader stdOutReader = new StandardOutReader();
+
+               ((ChannelExec) channel).setCommand(command);
+
+
+               ((ChannelExec) 
channel).setErrStream(stdOutReader.getStandardError());
+               try {
+                       channel.connect();
+               } catch (JSchException e) {
+
+                       channel.disconnect();
+//            session.disconnect();
+
+                       throw new WorkerException("Unable to retrieve command 
output. Command - " + command +
+                                       " on server - " + session.getHost() + 
":" + session.getPort() +
+                                       " connecting user name - "
+                                       + session.getUserName(), e);
+               }
+               stdOutReader.onOutput(channel);
+               stdOutReader.getStdOutputString();
+               if (stdOutReader.getStdErrorString().contains("ls:")) {
+                       throw new 
WorkerException(stdOutReader.getStdErrorString());
+               }
+               channel.disconnect();
+               return 
Arrays.asList(stdOutReader.getStdOutputString().split("\n"));
+       }
+
+
+       static int checkAck(InputStream in) throws IOException {
+               int b = in.read();
+               if (b == 0) return b;
+               if (b == -1) return b;
+
+               if (b == 1 || b == 2) {
+                       StringBuffer sb = new StringBuffer();
+                       int c;
+                       do {
+                               c = in.read();
+                               sb.append((char) c);
+                       }
+                       while (c != '\n');
+                       //FIXME: Redundant
+                       if (b == 1) { // error
+                               System.out.print(sb.toString());
+                       }
+                       if (b == 2) { // fatal error
+                               System.out.print(sb.toString());
+                       }
+                       log.warn(sb.toString());
+               }
+               return b;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
 
b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
index 75a8062..b5ce833 100644
--- 
a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
+++ 
b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
@@ -36,6 +36,7 @@ import 
org.apache.airavata.worker.core.authentication.SSHKeyAuthentication;
 import org.apache.airavata.worker.core.cluster.ServerInfo;
 import org.apache.airavata.worker.core.config.ResourceConfig;
 import org.apache.airavata.worker.core.config.WorkerYamlConfigruation;
+import org.apache.airavata.worker.core.context.ProcessContext;
 import org.apache.airavata.worker.core.exceptions.WorkerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -89,6 +90,17 @@ public class WorkerFactory {
         return resources.get(resourceJobManagerType);
     }
 
+    public static SSHKeyAuthentication 
getStorageSSHKeyAuthentication(ProcessContext pc)
+            throws WorkerException, CredentialStoreException {
+        try {
+            return getSshKeyAuthentication(pc.getGatewayId(),
+                    pc.getStorageResourceLoginUserName(),
+                    pc.getStorageResourceCredentialToken());
+        }  catch (ApplicationSettingsException | IllegalAccessException | 
InstantiationException e) {
+            throw new WorkerException("Couldn't build ssh authentication 
object", e);
+        }
+    }
+
     public static SSHKeyAuthentication getSshKeyAuthentication(String 
gatewayId,
                                                                 String 
loginUserName,
                                                                 String 
credentialStoreToken)

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
 
b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
index a53d736..663428a 100644
--- 
a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
+++ 
b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
@@ -8,10 +8,14 @@ import 
org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
 import org.apache.airavata.messaging.core.MessageContext;
 import 
org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.data.replica.*;
+import org.apache.airavata.model.experiment.ExperimentModel;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.messaging.event.*;
 import org.apache.airavata.model.status.*;
+import 
org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.*;
 import org.apache.airavata.registry.cpi.utils.Constants;
 import org.apache.airavata.worker.core.context.ProcessContext;
@@ -20,6 +24,9 @@ import 
org.apache.airavata.worker.core.exceptions.WorkerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -232,4 +239,61 @@ public class WorkerUtils {
             return null;
         }
     }
+
+    public static void saveExperimentOutput(ProcessContext processContext, 
String outputName, String outputVal) throws WorkerException {
+        try {
+            ExperimentCatalog experimentCatalog = 
processContext.getExperimentCatalog();
+            String experimentId = processContext.getExperimentId();
+            ExperimentModel experiment = 
(ExperimentModel)experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, 
experimentId);
+            List<OutputDataObjectType> experimentOutputs = 
experiment.getExperimentOutputs();
+            if (experimentOutputs != null && !experimentOutputs.isEmpty()){
+                for (OutputDataObjectType expOutput : experimentOutputs){
+                    if (expOutput.getName().equals(outputName)){
+                        DataProductModel dataProductModel = new 
DataProductModel();
+                        
dataProductModel.setGatewayId(processContext.getGatewayId());
+                        
dataProductModel.setOwnerName(processContext.getProcessModel().getUserName());
+                        dataProductModel.setProductName(outputName);
+                        
dataProductModel.setDataProductType(DataProductType.FILE);
+
+                        DataReplicaLocationModel replicaLocationModel = new 
DataReplicaLocationModel();
+                        
replicaLocationModel.setStorageResourceId(processContext.getStorageResource().getStorageResourceId());
+                        replicaLocationModel.setReplicaName(outputName + " 
gateway data store copy");
+                        
replicaLocationModel.setReplicaLocationCategory(ReplicaLocationCategory.GATEWAY_DATA_STORE);
+                        
replicaLocationModel.setReplicaPersistentType(ReplicaPersistentType.TRANSIENT);
+                        replicaLocationModel.setFilePath(outputVal);
+                        
dataProductModel.addToReplicaLocations(replicaLocationModel);
+
+                        ReplicaCatalog replicaCatalog = 
RegistryFactory.getReplicaCatalog();
+                        String productUri = 
replicaCatalog.registerDataProduct(dataProductModel);
+                        expOutput.setValue(productUri);
+                    }
+                }
+            }
+            experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, 
experiment, experimentId);
+        } catch (RegistryException e) {
+            String msg = "expId: " + processContext.getExperimentId() + " 
processId: " + processContext.getProcessId()
+                    + " : - Error while updating experiment outputs";
+            throw new WorkerException(msg, e);
+        }
+    }
+
+    public static URI getDestinationURI(TaskContext taskContext, String 
hostName, String inputPath, String fileName) throws URISyntaxException {
+        String experimentDataDir = 
taskContext.getParentProcessContext().getProcessModel().getExperimentDataDir();
+        String filePath;
+        if(experimentDataDir != null && !experimentDataDir.isEmpty()) {
+            if(!experimentDataDir.endsWith(File.separator)){
+                experimentDataDir += File.separator;
+            }
+            if (experimentDataDir.startsWith(File.separator)) {
+                filePath = experimentDataDir + fileName;
+            } else {
+                filePath = inputPath + experimentDataDir + fileName;
+            }
+        } else {
+            filePath = inputPath + 
taskContext.getParentProcessContext().getProcessId() + File.separator + 
fileName;
+        }
+        //FIXME
+        return new URI("file", 
taskContext.getParentProcessContext().getStorageResourceLoginUserName(), 
hostName, 22, filePath, null, null);
+
+    }
 }

Reply via email to