This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch staging
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/staging by this push:
     new 05a422a  Adding capability for adaptors to support non default ssh 
ports
05a422a is described below

commit 05a422ade51e1816306babc1df721fdb951bcde5
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Thu Mar 28 18:41:27 2019 -0400

    Adding capability for adaptors to support non default ssh ports
---
 .../airavata/helix/adaptor/PoolingSSHJClient.java  |  2 +-
 .../airavata/helix/adaptor/SSHJAgentAdaptor.java   | 35 ++++++++++++++++++----
 .../airavata/helix/adaptor/SSHJStorageAdaptor.java | 29 ++++++++++++++++--
 .../impl/task/staging/OutputDataStagingTask.java   |  2 +-
 4 files changed, 57 insertions(+), 11 deletions(-)

diff --git 
a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
 
b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
index 75ab626..244536e 100644
--- 
a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
+++ 
b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
@@ -245,7 +245,7 @@ public class PoolingSSHJClient extends SSHClient {
             sshClient.addHostKeyVerifier(hostKeyVerifier);
         }
 
-        sshClient.connect(host);
+        sshClient.connect(host, port);
 
         sshClient.getConnection().getKeepAlive().setKeepAliveInterval(5); 
//send keep alive signal every 5sec
 
diff --git 
a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
 
b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
index 4349844..f10ba53 100644
--- 
a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
+++ 
b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
@@ -38,6 +38,9 @@ import org.apache.airavata.agents.api.CommandOutput;
 import org.apache.airavata.helix.adaptor.wrapper.SCPFileTransferWrapper;
 import org.apache.airavata.helix.agent.ssh.StandardOutReader;
 import 
org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import 
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import 
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
 import org.apache.airavata.model.credential.store.SSHCredential;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,12 +58,12 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
 
     private PoolingSSHJClient sshjClient;
 
-    protected void createPoolingSSHJClient(String user, String host, String 
publicKey, String privateKey, String passphrase) throws IOException {
+    protected void createPoolingSSHJClient(String user, String host, int port, 
String publicKey, String privateKey, String passphrase) throws IOException {
         DefaultConfig defaultConfig = new DefaultConfig();
         defaultConfig.setKeepAliveProvider(KeepAliveProvider.KEEP_ALIVE);
 
-        sshjClient = new PoolingSSHJClient(defaultConfig, host, 22);
-        sshjClient.addHostKeyVerifier((hostname, port, key) -> true);
+        sshjClient = new PoolingSSHJClient(defaultConfig, host, port == 0 ? 22 
: port);
+        sshjClient.addHostKeyVerifier((h, p, key) -> true);
 
         sshjClient.setMaxSessionsForConnection(1);
 
@@ -96,9 +99,9 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
         sshjClient.auth(user, am);
     }
 
-    public void init(String user, String host, String publicKey, String 
privateKey, String passphrase) throws AgentException {
+    public void init(String user, String host, int port, String publicKey, 
String privateKey, String passphrase) throws AgentException {
         try {
-            createPoolingSSHJClient(user, host, publicKey, privateKey, 
passphrase);
+            createPoolingSSHJClient(user, host, port, publicKey, privateKey, 
passphrase);
         } catch (IOException e) {
             logger.error("Error while initializing sshj agent for user " + 
user + " host " + host + " for key starting with " + publicKey.substring(0,10), 
e);
             throw new AgentException("Error while initializing sshj agent for 
user " + user + " host " + host + " for key starting with " + 
publicKey.substring(0,10), e);
@@ -108,17 +111,37 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
     @Override
     public void init(String computeResource, String gatewayId, String userId, 
String token) throws AgentException {
         try {
+            logger.info("Initializing Compute Resource SSH Adaptor for compute 
resource : "+ computeResource + ", gateway : " +
+                    gatewayId +", user " + userId + ", token : " + token);
+
             ComputeResourceDescription computeResourceDescription = 
AgentUtils.getRegistryServiceClient().getComputeResource(computeResource);
 
+            logger.info("Fetching job submission interfaces for compute 
resource " + computeResource);
+
+            Optional<JobSubmissionInterface> jobSubmissionInterfaceOp = 
computeResourceDescription.getJobSubmissionInterfaces()
+                    .stream().filter(iface -> iface.getJobSubmissionProtocol() 
== JobSubmissionProtocol.SSH).findFirst();
+
+            JobSubmissionInterface sshInterface = jobSubmissionInterfaceOp
+                    .orElseThrow(() -> new AgentException("Could not find a 
SSH interface for compute resource " + computeResource));
+
+            SSHJobSubmission sshJobSubmission = 
AgentUtils.getRegistryServiceClient().getSSHJobSubmission(sshInterface.getJobSubmissionInterfaceId());
+
             logger.info("Fetching credentials for cred store token " + token);
 
             SSHCredential sshCredential = 
AgentUtils.getCredentialClient().getSSHCredential(token, gatewayId);
+
             if (sshCredential == null) {
                 throw new AgentException("Null credential for token " + token);
             }
             logger.info("Description for token : " + token + " : " + 
sshCredential.getDescription());
 
-            createPoolingSSHJClient(userId, 
computeResourceDescription.getHostName(),
+            String alternateHostName = 
sshJobSubmission.getAlternativeSSHHostName();
+            String selectedHostName = (alternateHostName == null || 
"".equals(alternateHostName))?
+                    computeResourceDescription.getHostName() : 
alternateHostName;
+
+            int selectedPort = sshJobSubmission.getSshPort() == 0 ? 22 : 
sshJobSubmission.getSshPort();
+
+            createPoolingSSHJClient(userId, selectedHostName, selectedPort,
                     sshCredential.getPublicKey(), 
sshCredential.getPrivateKey(), sshCredential.getPassphrase());
 
         } catch (Exception e) {
diff --git 
a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
 
b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
index 811cac2..0cc7ff9 100644
--- 
a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
+++ 
b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
@@ -25,9 +25,14 @@ import org.apache.airavata.agents.api.CommandOutput;
 import org.apache.airavata.agents.api.StorageResourceAdaptor;
 import 
org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
 import org.apache.airavata.model.credential.store.SSHCredential;
+import org.apache.airavata.model.data.movement.DataMovementInterface;
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.airavata.model.data.movement.SCPDataMovement;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Optional;
+
 public class SSHJStorageAdaptor extends SSHJAgentAdaptor implements 
StorageResourceAdaptor {
 
     private final static Logger logger = 
LoggerFactory.getLogger(SSHJAgentAdaptor.class);
@@ -35,9 +40,20 @@ public class SSHJStorageAdaptor extends SSHJAgentAdaptor 
implements StorageResou
     @Override
     public void init(String storageResourceId, String gatewayId, String 
loginUser, String token) throws AgentException {
         try {
-            logger.info("Initializing Storage Resource Adaptor for storage 
resource : "+ storageResourceId + ", gateway : " +
+            logger.info("Initializing Storage Resource SCP Adaptor for storage 
resource : "+ storageResourceId + ", gateway : " +
                     gatewayId +", user " + loginUser + ", token : " + token);
-            StorageResourceDescription storageResource = 
AgentUtils.getRegistryServiceClient().getStorageResource(storageResourceId);
+
+            StorageResourceDescription storageResourceDescription = 
AgentUtils.getRegistryServiceClient().getStorageResource(storageResourceId);
+
+            logger.info("Fetching data movement interfaces for storage 
resource " + storageResourceId);
+
+            Optional<DataMovementInterface> dmInterfaceOp = 
storageResourceDescription.getDataMovementInterfaces()
+                    .stream().filter(iface -> iface.getDataMovementProtocol() 
== DataMovementProtocol.SCP).findFirst();
+
+            DataMovementInterface scpInterface = dmInterfaceOp
+                    .orElseThrow(() -> new AgentException("Could not find a 
SCP interface for storage resource " + storageResourceId));
+
+            SCPDataMovement scpDataMovement = 
AgentUtils.getRegistryServiceClient().getSCPDataMovement(scpInterface.getDataMovementInterfaceId());
 
             logger.info("Fetching credentials for cred store token " + token);
 
@@ -45,9 +61,16 @@ public class SSHJStorageAdaptor extends SSHJAgentAdaptor 
implements StorageResou
             if (sshCredential == null) {
                 throw new AgentException("Null credential for token " + token);
             }
+
             logger.info("Description for token : " + token + " : " + 
sshCredential.getDescription());
 
-            createPoolingSSHJClient(loginUser, storageResource.getHostName(), 
sshCredential.getPublicKey(),
+            String alternateHostName = 
scpDataMovement.getAlternativeSCPHostName();
+            String selectedHostName = (alternateHostName == null || 
"".equals(alternateHostName))?
+                    storageResourceDescription.getHostName() : 
alternateHostName;
+
+            int selectedPort = scpDataMovement.getSshPort() == 0 ? 22 : 
scpDataMovement.getSshPort();
+
+            createPoolingSSHJClient(loginUser, selectedHostName, selectedPort, 
sshCredential.getPublicKey(),
                     sshCredential.getPrivateKey(), 
sshCredential.getPassphrase());
 
         } catch (Exception e) {
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
index 1f66a1a..419ff02 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
@@ -167,7 +167,7 @@ public class OutputDataStagingTask extends DataStagingTask {
                 return onSuccess("Output data staging task " + getTaskId() + " 
successfully completed");
 
             } else {
-                // Downloading input file from the storage resource
+                // Uploading output file to the storage resource
                 assert processOutput != null;
                 boolean transferred = 
transferFileToStorage(sourceURI.getPath(), destinationURI.getPath(), 
sourceFileName, adaptor, storageResourceAdaptor);
                 if (transferred) {

Reply via email to