This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch staging
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/staging by this push:
new 05a422a Adding capability for adaptors to support non default ssh
ports
05a422a is described below
commit 05a422ade51e1816306babc1df721fdb951bcde5
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Thu Mar 28 18:41:27 2019 -0400
Adding capability for adaptors to support non default ssh ports
---
.../airavata/helix/adaptor/PoolingSSHJClient.java | 2 +-
.../airavata/helix/adaptor/SSHJAgentAdaptor.java | 35 ++++++++++++++++++----
.../airavata/helix/adaptor/SSHJStorageAdaptor.java | 29 ++++++++++++++++--
.../impl/task/staging/OutputDataStagingTask.java | 2 +-
4 files changed, 57 insertions(+), 11 deletions(-)
diff --git
a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
index 75ab626..244536e 100644
---
a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
+++
b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
@@ -245,7 +245,7 @@ public class PoolingSSHJClient extends SSHClient {
sshClient.addHostKeyVerifier(hostKeyVerifier);
}
- sshClient.connect(host);
+ sshClient.connect(host, port);
sshClient.getConnection().getKeepAlive().setKeepAliveInterval(5);
//send keep alive signal every 5sec
diff --git
a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
index 4349844..f10ba53 100644
---
a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
+++
b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
@@ -38,6 +38,9 @@ import org.apache.airavata.agents.api.CommandOutput;
import org.apache.airavata.helix.adaptor.wrapper.SCPFileTransferWrapper;
import org.apache.airavata.helix.agent.ssh.StandardOutReader;
import
org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
import org.apache.airavata.model.credential.store.SSHCredential;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,12 +58,12 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
private PoolingSSHJClient sshjClient;
- protected void createPoolingSSHJClient(String user, String host, String
publicKey, String privateKey, String passphrase) throws IOException {
+ protected void createPoolingSSHJClient(String user, String host, int port,
String publicKey, String privateKey, String passphrase) throws IOException {
DefaultConfig defaultConfig = new DefaultConfig();
defaultConfig.setKeepAliveProvider(KeepAliveProvider.KEEP_ALIVE);
- sshjClient = new PoolingSSHJClient(defaultConfig, host, 22);
- sshjClient.addHostKeyVerifier((hostname, port, key) -> true);
+ sshjClient = new PoolingSSHJClient(defaultConfig, host, port == 0 ? 22
: port);
+ sshjClient.addHostKeyVerifier((h, p, key) -> true);
sshjClient.setMaxSessionsForConnection(1);
@@ -96,9 +99,9 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
sshjClient.auth(user, am);
}
- public void init(String user, String host, String publicKey, String
privateKey, String passphrase) throws AgentException {
+ public void init(String user, String host, int port, String publicKey,
String privateKey, String passphrase) throws AgentException {
try {
- createPoolingSSHJClient(user, host, publicKey, privateKey,
passphrase);
+ createPoolingSSHJClient(user, host, port, publicKey, privateKey,
passphrase);
} catch (IOException e) {
logger.error("Error while initializing sshj agent for user " +
user + " host " + host + " for key starting with " + publicKey.substring(0,10),
e);
throw new AgentException("Error while initializing sshj agent for
user " + user + " host " + host + " for key starting with " +
publicKey.substring(0,10), e);
@@ -108,17 +111,37 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
@Override
public void init(String computeResource, String gatewayId, String userId,
String token) throws AgentException {
try {
+ logger.info("Initializing Compute Resource SSH Adaptor for compute
resource : "+ computeResource + ", gateway : " +
+ gatewayId +", user " + userId + ", token : " + token);
+
ComputeResourceDescription computeResourceDescription =
AgentUtils.getRegistryServiceClient().getComputeResource(computeResource);
+ logger.info("Fetching job submission interfaces for compute
resource " + computeResource);
+
+ Optional<JobSubmissionInterface> jobSubmissionInterfaceOp =
computeResourceDescription.getJobSubmissionInterfaces()
+ .stream().filter(iface -> iface.getJobSubmissionProtocol()
== JobSubmissionProtocol.SSH).findFirst();
+
+ JobSubmissionInterface sshInterface = jobSubmissionInterfaceOp
+ .orElseThrow(() -> new AgentException("Could not find a
SSH interface for compute resource " + computeResource));
+
+ SSHJobSubmission sshJobSubmission =
AgentUtils.getRegistryServiceClient().getSSHJobSubmission(sshInterface.getJobSubmissionInterfaceId());
+
logger.info("Fetching credentials for cred store token " + token);
SSHCredential sshCredential =
AgentUtils.getCredentialClient().getSSHCredential(token, gatewayId);
+
if (sshCredential == null) {
throw new AgentException("Null credential for token " + token);
}
logger.info("Description for token : " + token + " : " +
sshCredential.getDescription());
- createPoolingSSHJClient(userId,
computeResourceDescription.getHostName(),
+ String alternateHostName =
sshJobSubmission.getAlternativeSSHHostName();
+ String selectedHostName = (alternateHostName == null ||
"".equals(alternateHostName))?
+ computeResourceDescription.getHostName() :
alternateHostName;
+
+ int selectedPort = sshJobSubmission.getSshPort() == 0 ? 22 :
sshJobSubmission.getSshPort();
+
+ createPoolingSSHJClient(userId, selectedHostName, selectedPort,
sshCredential.getPublicKey(),
sshCredential.getPrivateKey(), sshCredential.getPassphrase());
} catch (Exception e) {
diff --git
a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
index 811cac2..0cc7ff9 100644
---
a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
+++
b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
@@ -25,9 +25,14 @@ import org.apache.airavata.agents.api.CommandOutput;
import org.apache.airavata.agents.api.StorageResourceAdaptor;
import
org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
import org.apache.airavata.model.credential.store.SSHCredential;
+import org.apache.airavata.model.data.movement.DataMovementInterface;
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.airavata.model.data.movement.SCPDataMovement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Optional;
+
public class SSHJStorageAdaptor extends SSHJAgentAdaptor implements
StorageResourceAdaptor {
private final static Logger logger =
LoggerFactory.getLogger(SSHJAgentAdaptor.class);
@@ -35,9 +40,20 @@ public class SSHJStorageAdaptor extends SSHJAgentAdaptor
implements StorageResou
@Override
public void init(String storageResourceId, String gatewayId, String
loginUser, String token) throws AgentException {
try {
- logger.info("Initializing Storage Resource Adaptor for storage
resource : "+ storageResourceId + ", gateway : " +
+ logger.info("Initializing Storage Resource SCP Adaptor for storage
resource : "+ storageResourceId + ", gateway : " +
gatewayId +", user " + loginUser + ", token : " + token);
- StorageResourceDescription storageResource =
AgentUtils.getRegistryServiceClient().getStorageResource(storageResourceId);
+
+ StorageResourceDescription storageResourceDescription =
AgentUtils.getRegistryServiceClient().getStorageResource(storageResourceId);
+
+ logger.info("Fetching data movement interfaces for storage
resource " + storageResourceId);
+
+ Optional<DataMovementInterface> dmInterfaceOp =
storageResourceDescription.getDataMovementInterfaces()
+ .stream().filter(iface -> iface.getDataMovementProtocol()
== DataMovementProtocol.SCP).findFirst();
+
+ DataMovementInterface scpInterface = dmInterfaceOp
+ .orElseThrow(() -> new AgentException("Could not find a
SCP interface for storage resource " + storageResourceId));
+
+ SCPDataMovement scpDataMovement =
AgentUtils.getRegistryServiceClient().getSCPDataMovement(scpInterface.getDataMovementInterfaceId());
logger.info("Fetching credentials for cred store token " + token);
@@ -45,9 +61,16 @@ public class SSHJStorageAdaptor extends SSHJAgentAdaptor
implements StorageResou
if (sshCredential == null) {
throw new AgentException("Null credential for token " + token);
}
+
logger.info("Description for token : " + token + " : " +
sshCredential.getDescription());
- createPoolingSSHJClient(loginUser, storageResource.getHostName(),
sshCredential.getPublicKey(),
+ String alternateHostName =
scpDataMovement.getAlternativeSCPHostName();
+ String selectedHostName = (alternateHostName == null ||
"".equals(alternateHostName))?
+ storageResourceDescription.getHostName() :
alternateHostName;
+
+ int selectedPort = scpDataMovement.getSshPort() == 0 ? 22 :
scpDataMovement.getSshPort();
+
+ createPoolingSSHJClient(loginUser, selectedHostName, selectedPort,
sshCredential.getPublicKey(),
sshCredential.getPrivateKey(),
sshCredential.getPassphrase());
} catch (Exception e) {
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
index 1f66a1a..419ff02 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
@@ -167,7 +167,7 @@ public class OutputDataStagingTask extends DataStagingTask {
return onSuccess("Output data staging task " + getTaskId() + "
successfully completed");
} else {
- // Downloading input file from the storage resource
+ // Uploading output file to the storage resource
assert processOutput != null;
boolean transferred =
transferFileToStorage(sourceURI.getPath(), destinationURI.getPath(),
sourceFileName, adaptor, storageResourceAdaptor);
if (transferred) {