This is an automated email from the ASF dual-hosted git repository. yasith pushed a commit to branch service-layer-improvements in repository https://gitbox.apache.org/repos/asf/airavata.git
commit f5bf7c9fcc2cee8483e6e45fb8bb94340812f8d7 Author: yasithdev <[email protected]> AuthorDate: Tue Dec 9 18:20:36 2025 -0600 cleanup wrappers --- .../airavata/helix/adaptor/PoolingSSHJClient.java | 175 ++++++++++++++----- .../airavata/helix/adaptor/SSHJAgentAdaptor.java | 138 +++++++-------- .../adaptor/wrapper/SCPFileTransferWrapper.java | 106 ----------- .../helix/adaptor/wrapper/SFTPClientWrapper.java | 50 ------ .../helix/adaptor/wrapper/SSHClientWrapper.java | 44 ----- .../helix/adaptor/wrapper/SessionWrapper.java | 194 --------------------- 6 files changed, 197 insertions(+), 510 deletions(-) diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java index 5952e4e14d..b9bb039f7b 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java @@ -36,10 +36,8 @@ import net.schmizz.sshj.transport.TransportException; import net.schmizz.sshj.transport.verification.HostKeyVerifier; import net.schmizz.sshj.userauth.UserAuthException; import net.schmizz.sshj.userauth.method.AuthMethod; -import org.apache.airavata.helix.adaptor.wrapper.SCPFileTransferWrapper; -import org.apache.airavata.helix.adaptor.wrapper.SFTPClientWrapper; -import org.apache.airavata.helix.adaptor.wrapper.SSHClientWrapper; -import org.apache.airavata.helix.adaptor.wrapper.SessionWrapper; +import net.schmizz.sshj.connection.channel.direct.Session; +import net.schmizz.sshj.xfer.scp.SCPFileTransfer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +52,8 @@ public class PoolingSSHJClient extends SSHClient { private static final Logger logger = LoggerFactory.getLogger(PoolingSSHJClient.class); private final ReadWriteLock lock = new ReentrantReadWriteLock(); - private final Map<SSHClientWrapper, SSHClientInfo> clientInfoMap = new HashMap<>(); + private final Map<SSHClient, SSHClientInfo> clientInfoMap = new HashMap<>(); + private final Map<SSHClient, Boolean> clientErrorMap = new HashMap<>(); private HostKeyVerifier hostKeyVerifier; private String username; @@ -100,8 +99,8 @@ public class PoolingSSHJClient extends SSHClient { ////////////////// client specific operations /////// - private SSHClientWrapper newClientWithSessionValidation() throws IOException { - SSHClientWrapper newClient = createNewSSHClient(); + private SSHClient newClientWithSessionValidation() throws IOException { + SSHClient newClient = createNewSSHClient(); SSHClientInfo info = new SSHClientInfo(1, System.currentTimeMillis(), clientInfoMap.size()); clientInfoMap.put(newClient, info); @@ -137,7 +136,7 @@ public class PoolingSSHJClient extends SSHClient { return newClient; } - private SSHClientWrapper leaseSSHClient() throws Exception { + private SSHClient leaseSSHClient() throws Exception { lock.writeLock().lock(); try { @@ -146,10 +145,10 @@ public class PoolingSSHJClient extends SSHClient { } else { - Optional<Map.Entry<SSHClientWrapper, SSHClientInfo>> minEntryOp = clientInfoMap.entrySet().stream() + Optional<Map.Entry<SSHClient, SSHClientInfo>> minEntryOp = clientInfoMap.entrySet().stream() .min(Comparator.comparing(entry -> entry.getValue().sessionCount)); if (minEntryOp.isPresent()) { - Map.Entry<SSHClientWrapper, SSHClientInfo> minEntry = minEntryOp.get(); + Map.Entry<SSHClient, SSHClientInfo> minEntry = minEntryOp.get(); // use the connection with least amount of sessions created. logger.debug( @@ -176,9 +175,9 @@ public class PoolingSSHJClient extends SSHClient { minEntry.getValue().setSessionCount(minEntry.getValue().getSessionCount() + 1); minEntry.getValue().setLastAccessedTime(System.currentTimeMillis()); - SSHClientWrapper sshClient = minEntry.getKey(); + SSHClient sshClient = minEntry.getKey(); - if (!sshClient.isConnected() || !sshClient.isAuthenticated() || sshClient.isErrored()) { + if (!sshClient.isConnected() || !sshClient.isAuthenticated() || isClientErrored(sshClient)) { logger.warn( "Client for host {} is not connected or not authenticated. Creating a new client", host); @@ -198,14 +197,14 @@ public class PoolingSSHJClient extends SSHClient { } } - private void removeDisconnectedClients(SSHClientWrapper client, boolean doDisconnect) { + private void removeDisconnectedClients(SSHClient client, boolean doDisconnect) { lock.writeLock().lock(); if (doDisconnect) { try { client.disconnect(); } catch (Exception e) { - log.warn("Errored while disconnecting the client " + e.getMessage()); + logger.warn("Errored while disconnecting the client " + e.getMessage()); // Ignore } } @@ -217,6 +216,7 @@ public class PoolingSSHJClient extends SSHClient { clientInfoMap.get(client).getClientId(), host); clientInfoMap.remove(client); + clientErrorMap.remove(client); } } finally { @@ -224,7 +224,7 @@ public class PoolingSSHJClient extends SSHClient { } } - private void untrackClosedSessions(SSHClientWrapper client, int sessionId) { + public void untrackClosedSessions(SSHClient client, int sessionId) { lock.writeLock().lock(); try { @@ -243,7 +243,7 @@ public class PoolingSSHJClient extends SSHClient { } private void removeStaleConnections() { - List<Map.Entry<SSHClientWrapper, SSHClientInfo>> entriesTobeRemoved; + List<Map.Entry<SSHClient, SSHClientInfo>> entriesTobeRemoved; lock.writeLock().lock(); logger.info("Current active connections for {} @ {} : {} are {}", username, host, port, clientInfoMap.size()); try { @@ -266,19 +266,20 @@ public class PoolingSSHJClient extends SSHClient { entriesTobeRemoved.forEach(entry -> { try { entry.getKey().disconnect(); + clientErrorMap.remove(entry.getKey()); } catch (IOException e) { logger.warn("Failed to disconnect connection {} for host {}", entry.getValue().clientId, host); } }); } - private SSHClientWrapper createNewSSHClient() throws IOException { + private SSHClient createNewSSHClient() throws IOException { - SSHClientWrapper sshClient; + SSHClient sshClient; if (config != null) { - sshClient = new SSHClientWrapper(config); + sshClient = new SSHClient(config); } else { - sshClient = new SSHClientWrapper(); + sshClient = new SSHClient(); } sshClient.getConnection().getTransport().setDisconnectListener(new DisconnectListener() { @@ -304,70 +305,150 @@ public class PoolingSSHJClient extends SSHClient { return sshClient; } - public SessionWrapper startSessionWrapper() throws Exception { + public static class SessionResource { + private final Session session; + private final SSHClient sshClient; + private final PoolingSSHJClient pool; - final SSHClientWrapper sshClient = leaseSSHClient(); + SessionResource(Session session, SSHClient sshClient, PoolingSSHJClient pool) { + this.session = session; + this.sshClient = sshClient; + this.pool = pool; + } - try { - return new SessionWrapper( - sshClient.startSession(), (id) -> untrackClosedSessions(sshClient, id), sshClient); + public Session getSession() { + return session; + } + + public void close() throws Exception { + session.close(); + pool.untrackClosedSessions(sshClient, session.getID()); + } + + public void markErrored() { + pool.markClientErrored(sshClient); + } + } + + public static class SCPFileTransferResource { + private final SCPFileTransfer fileTransfer; + private final SSHClient sshClient; + private final PoolingSSHJClient pool; + + SCPFileTransferResource(SCPFileTransfer fileTransfer, SSHClient sshClient, PoolingSSHJClient pool) { + this.fileTransfer = fileTransfer; + this.sshClient = sshClient; + this.pool = pool; + } + + public SCPFileTransfer getFileTransfer() { + return fileTransfer; + } + public void close() throws Exception { + // SCPFileTransfer doesn't need explicit closing, just track the session + pool.untrackClosedSessions(sshClient, -1); + } + + public void markErrored() { + pool.markClientErrored(sshClient); + } + } + + public static class SFTPClientResource { + private final SFTPClient sftpClient; + private final SSHClient sshClient; + private final PoolingSSHJClient pool; + + SFTPClientResource(SFTPClient sftpClient, SSHClient sshClient, PoolingSSHJClient pool) { + this.sftpClient = sftpClient; + this.sshClient = sshClient; + this.pool = pool; + } + + public SFTPClient getSFTPClient() { + return sftpClient; + } + + public void close() throws Exception { + sftpClient.close(); + pool.untrackClosedSessions(sshClient, -1); + } + + public void markErrored() { + pool.markClientErrored(sshClient); + } + } + + public SessionResource startSessionResource() throws Exception { + final SSHClient sshClient = leaseSSHClient(); + try { + Session session = sshClient.startSession(); + return new SessionResource(session, sshClient, this); } catch (Exception e) { if (sshClient != null) { // If it is a ConnectionExceptions, explicitly invalidate the client if (e instanceof ConnectionException) { - sshClient.setErrored(true); + markClientErrored(sshClient); } - untrackClosedSessions(sshClient, -1); } throw e; } } - public SCPFileTransferWrapper newSCPFileTransferWrapper() throws Exception { - - final SSHClientWrapper sshClient = leaseSSHClient(); - + public SCPFileTransferResource newSCPFileTransferResource() throws Exception { + final SSHClient sshClient = leaseSSHClient(); try { - return new SCPFileTransferWrapper( - sshClient.newSCPFileTransfer(), (id) -> untrackClosedSessions(sshClient, id), sshClient); - + SCPFileTransfer fileTransfer = sshClient.newSCPFileTransfer(); + return new SCPFileTransferResource(fileTransfer, sshClient, this); } catch (Exception e) { - if (sshClient != null) { // If it is a ConnectionExceptions, explicitly invalidate the client if (e instanceof ConnectionException) { - sshClient.setErrored(true); + markClientErrored(sshClient); } - untrackClosedSessions(sshClient, -1); } throw e; } } - public SFTPClientWrapper newSFTPClientWrapper() throws Exception { - - final SSHClientWrapper sshClient = leaseSSHClient(); - + public SFTPClientResource newSFTPClientResource() throws Exception { + final SSHClient sshClient = leaseSSHClient(); try { - return new SFTPClientWrapper( - sshClient.newSFTPClient(), (id) -> untrackClosedSessions(sshClient, id), sshClient); + SFTPClient sftpClient = sshClient.newSFTPClient(); + return new SFTPClientResource(sftpClient, sshClient, this); } catch (Exception e) { - if (sshClient != null) { // If it is a ConnectionExceptions, explicitly invalidate the client if (e instanceof ConnectionException) { - sshClient.setErrored(true); + markClientErrored(sshClient); } - untrackClosedSessions(sshClient, -1); } throw e; } } + public void markClientErrored(SSHClient client) { + lock.writeLock().lock(); + try { + clientErrorMap.put(client, true); + } finally { + lock.writeLock().unlock(); + } + } + + private boolean isClientErrored(SSHClient client) { + lock.readLock().lock(); + try { + return clientErrorMap.getOrDefault(client, false); + } finally { + lock.readLock().unlock(); + } + } + public class SSHClientInfo { private int sessionCount; @@ -461,7 +542,7 @@ public class PoolingSSHJClient extends SSHClient { return this; } - public Map<SSHClientWrapper, SSHClientInfo> getClientInfoMap() { + public Map<SSHClient, SSHClientInfo> getClientInfoMap() { return clientInfoMap; } } diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java index 82b55aae9c..a138b6341a 100644 --- a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java +++ b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java @@ -43,9 +43,9 @@ import net.schmizz.sshj.xfer.LocalDestFile; import net.schmizz.sshj.xfer.LocalFileFilter; import net.schmizz.sshj.xfer.LocalSourceFile; import org.apache.airavata.agents.api.*; -import org.apache.airavata.helix.adaptor.wrapper.SCPFileTransferWrapper; -import org.apache.airavata.helix.adaptor.wrapper.SFTPClientWrapper; -import org.apache.airavata.helix.adaptor.wrapper.SessionWrapper; +import org.apache.airavata.helix.adaptor.PoolingSSHJClient.SessionResource; +import org.apache.airavata.helix.adaptor.PoolingSSHJClient.SCPFileTransferResource; +import org.apache.airavata.helix.adaptor.PoolingSSHJClient.SFTPClientResource; import org.apache.airavata.helix.agent.ssh.StandardOutReader; import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; @@ -222,11 +222,11 @@ public class SSHJAgentAdaptor implements AgentAdaptor { @Override public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException { - SessionWrapper session = null; + SessionResource sessionResource = null; try { - session = sshjClient.startSessionWrapper(); + sessionResource = sshjClient.startSessionResource(); Session.Command exec = - session.exec((workingDirectory != null ? "cd " + workingDirectory + "; " : "") + command); + sessionResource.getSession().exec((workingDirectory != null ? "cd " + workingDirectory + "; " : "") + command); StandardOutReader standardOutReader = new StandardOutReader(); try { @@ -241,15 +241,15 @@ public class SSHJAgentAdaptor implements AgentAdaptor { } catch (Exception e) { if (e instanceof ConnectionException) { - Optional.ofNullable(session).ifPresent(ft -> ft.setErrored(true)); + Optional.ofNullable(sessionResource).ifPresent(SessionResource::markErrored); } throw new AgentException(e); } finally { - Optional.ofNullable(session).ifPresent(ss -> { + Optional.ofNullable(sessionResource).ifPresent(ss -> { try { ss.close(); - } catch (IOException e) { + } catch (Exception e) { // Ignore } }); @@ -263,25 +263,25 @@ public class SSHJAgentAdaptor implements AgentAdaptor { @Override public void createDirectory(String path, boolean recursive) throws AgentException { - SFTPClientWrapper sftpClient = null; + SFTPClientResource sftpClientResource = null; try { - sftpClient = sshjClient.newSFTPClientWrapper(); + sftpClientResource = sshjClient.newSFTPClientResource(); if (recursive) { - sftpClient.mkdirs(path); + sftpClientResource.getSFTPClient().mkdirs(path); } else { - sftpClient.mkdir(path); + sftpClientResource.getSFTPClient().mkdir(path); } } catch (Exception e) { if (e instanceof ConnectionException) { - Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true)); + Optional.ofNullable(sftpClientResource).ifPresent(SFTPClientResource::markErrored); } throw new AgentException(e); } finally { - Optional.ofNullable(sftpClient).ifPresent(client -> { + Optional.ofNullable(sftpClientResource).ifPresent(client -> { try { client.close(); - } catch (IOException e) { + } catch (Exception e) { // Ignore } }); @@ -293,22 +293,22 @@ public class SSHJAgentAdaptor implements AgentAdaptor { if (path == null || path.trim().isEmpty()) { throw new AgentException("Directory path cannot be null or empty"); } - SFTPClientWrapper sftpClient = null; + SFTPClientResource sftpClientResource = null; try { - sftpClient = sshjClient.newSFTPClientWrapper(); - sftpClient.rmdir(path); + sftpClientResource = sshjClient.newSFTPClientResource(); + sftpClientResource.getSFTPClient().rmdir(path); } catch (Exception e) { if (e instanceof ConnectionException) { - Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true)); + Optional.ofNullable(sftpClientResource).ifPresent(SFTPClientResource::markErrored); } logger.error("Error while deleting directory {}", path, e); throw new AgentException(e); } finally { - Optional.ofNullable(sftpClient).ifPresent(client -> { + Optional.ofNullable(sftpClientResource).ifPresent(client -> { try { client.close(); - } catch (IOException e) { + } catch (Exception e) { // Ignore } }); @@ -317,22 +317,22 @@ public class SSHJAgentAdaptor implements AgentAdaptor { @Override public void uploadFile(String localFile, String remoteFile) throws AgentException { - SCPFileTransferWrapper fileTransfer = null; + SCPFileTransferResource fileTransferResource = null; try { - fileTransfer = sshjClient.newSCPFileTransferWrapper(); - fileTransfer.upload(localFile, remoteFile); + fileTransferResource = sshjClient.newSCPFileTransferResource(); + fileTransferResource.getFileTransfer().upload(localFile, remoteFile); } catch (Exception e) { if (e instanceof ConnectionException) { - Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true)); + Optional.ofNullable(fileTransferResource).ifPresent(SCPFileTransferResource::markErrored); } throw new AgentException(e); } finally { - Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> { + Optional.ofNullable(fileTransferResource).ifPresent(resource -> { try { - scpFileTransferWrapper.close(); - } catch (IOException e) { + resource.close(); + } catch (Exception e) { // Ignore } }); @@ -341,11 +341,11 @@ public class SSHJAgentAdaptor implements AgentAdaptor { @Override public void uploadFile(InputStream localInStream, FileMetadata metadata, String remoteFile) throws AgentException { - SCPFileTransferWrapper fileTransfer = null; + SCPFileTransferResource fileTransferResource = null; try { - fileTransfer = sshjClient.newSCPFileTransferWrapper(); - fileTransfer.upload( + fileTransferResource = sshjClient.newSCPFileTransferResource(); + fileTransferResource.getFileTransfer().upload( new LocalSourceFile() { @Override public String getName() { @@ -401,15 +401,15 @@ public class SSHJAgentAdaptor implements AgentAdaptor { remoteFile); } catch (Exception e) { if (e instanceof ConnectionException) { - Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true)); + Optional.ofNullable(fileTransferResource).ifPresent(SCPFileTransferResource::markErrored); } throw new AgentException(e); } finally { - Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> { + Optional.ofNullable(fileTransferResource).ifPresent(resource -> { try { - scpFileTransferWrapper.close(); - } catch (IOException e) { + resource.close(); + } catch (Exception e) { // Ignore } }); @@ -418,22 +418,22 @@ public class SSHJAgentAdaptor implements AgentAdaptor { @Override public void downloadFile(String remoteFile, String localFile) throws AgentException { - SCPFileTransferWrapper fileTransfer = null; + SCPFileTransferResource fileTransferResource = null; try { - fileTransfer = sshjClient.newSCPFileTransferWrapper(); - fileTransfer.download(remoteFile, localFile); + fileTransferResource = sshjClient.newSCPFileTransferResource(); + fileTransferResource.getFileTransfer().download(remoteFile, localFile); } catch (Exception e) { if (e instanceof ConnectionException) { - Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true)); + Optional.ofNullable(fileTransferResource).ifPresent(SCPFileTransferResource::markErrored); } throw new AgentException(e); } finally { - Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> { + Optional.ofNullable(fileTransferResource).ifPresent(resource -> { try { - scpFileTransferWrapper.close(); - } catch (IOException e) { + resource.close(); + } catch (Exception e) { // Ignore } }); @@ -443,10 +443,10 @@ public class SSHJAgentAdaptor implements AgentAdaptor { @Override public void downloadFile(String remoteFile, OutputStream localOutStream, FileMetadata metadata) throws AgentException { - SCPFileTransferWrapper fileTransfer = null; + SCPFileTransferResource fileTransferResource = null; try { - fileTransfer = sshjClient.newSCPFileTransferWrapper(); - fileTransfer.download(remoteFile, new LocalDestFile() { + fileTransferResource = sshjClient.newSCPFileTransferResource(); + fileTransferResource.getFileTransfer().download(remoteFile, new LocalDestFile() { @Override public long getLength() { return metadata.getSize(); @@ -488,15 +488,15 @@ public class SSHJAgentAdaptor implements AgentAdaptor { }); } catch (Exception e) { if (e instanceof ConnectionException) { - Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true)); + Optional.ofNullable(fileTransferResource).ifPresent(SCPFileTransferResource::markErrored); } throw new AgentException(e); } finally { - Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> { + Optional.ofNullable(fileTransferResource).ifPresent(resource -> { try { - scpFileTransferWrapper.close(); - } catch (IOException e) { + resource.close(); + } catch (Exception e) { // Ignore } }); @@ -505,22 +505,22 @@ public class SSHJAgentAdaptor implements AgentAdaptor { @Override public List<String> listDirectory(String path) throws AgentException { - SFTPClientWrapper sftpClient = null; + SFTPClientResource sftpClientResource = null; try { - sftpClient = sshjClient.newSFTPClientWrapper(); - List<RemoteResourceInfo> ls = sftpClient.ls(path); + sftpClientResource = sshjClient.newSFTPClientResource(); + List<RemoteResourceInfo> ls = sftpClientResource.getSFTPClient().ls(path); return ls.stream().map(RemoteResourceInfo::getName).collect(Collectors.toList()); } catch (Exception e) { if (e instanceof ConnectionException) { - Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true)); + Optional.ofNullable(sftpClientResource).ifPresent(SFTPClientResource::markErrored); } throw new AgentException(e); } finally { - Optional.ofNullable(sftpClient).ifPresent(client -> { + Optional.ofNullable(sftpClientResource).ifPresent(client -> { try { client.close(); - } catch (IOException e) { + } catch (Exception e) { // Ignore } }); @@ -529,21 +529,21 @@ public class SSHJAgentAdaptor implements AgentAdaptor { @Override public Boolean doesFileExist(String filePath) throws AgentException { - SFTPClientWrapper sftpClient = null; + SFTPClientResource sftpClientResource = null; try { - sftpClient = sshjClient.newSFTPClientWrapper(); - return sftpClient.statExistence(filePath) != null; + sftpClientResource = sshjClient.newSFTPClientResource(); + return sftpClientResource.getSFTPClient().statExistence(filePath) != null; } catch (Exception e) { if (e instanceof ConnectionException) { - Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true)); + Optional.ofNullable(sftpClientResource).ifPresent(SFTPClientResource::markErrored); } throw new AgentException(e); } finally { - Optional.ofNullable(sftpClient).ifPresent(client -> { + Optional.ofNullable(sftpClientResource).ifPresent(client -> { try { client.close(); - } catch (IOException e) { + } catch (Exception e) { // Ignore } }); @@ -579,10 +579,10 @@ public class SSHJAgentAdaptor implements AgentAdaptor { @Override public FileMetadata getFileMetadata(String remoteFile) throws AgentException { - SFTPClientWrapper sftpClient = null; + SFTPClientResource sftpClientResource = null; try { - sftpClient = sshjClient.newSFTPClientWrapper(); - FileAttributes stat = sftpClient.stat(remoteFile); + sftpClientResource = sshjClient.newSFTPClientResource(); + FileAttributes stat = sftpClientResource.getSFTPClient().stat(remoteFile); FileMetadata metadata = new FileMetadata(); metadata.setName(new File(remoteFile).getName()); metadata.setSize(stat.getSize()); @@ -591,15 +591,15 @@ public class SSHJAgentAdaptor implements AgentAdaptor { return metadata; } catch (Exception e) { if (e instanceof ConnectionException) { - Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true)); + Optional.ofNullable(sftpClientResource).ifPresent(SFTPClientResource::markErrored); } throw new AgentException(e); } finally { - Optional.ofNullable(sftpClient).ifPresent(scpFileTransferWrapper -> { + Optional.ofNullable(sftpClientResource).ifPresent(resource -> { try { - scpFileTransferWrapper.close(); - } catch (IOException e) { + resource.close(); + } catch (Exception e) { // Ignore } }); diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java deleted file mode 100644 index 2b694f4a62..0000000000 --- a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java +++ /dev/null @@ -1,106 +0,0 @@ -/** -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.airavata.helix.adaptor.wrapper; - -import java.io.Closeable; -import java.io.IOException; -import java.util.function.Consumer; -import net.schmizz.sshj.xfer.FileTransfer; -import net.schmizz.sshj.xfer.LocalDestFile; -import net.schmizz.sshj.xfer.LocalSourceFile; -import net.schmizz.sshj.xfer.TransferListener; -import net.schmizz.sshj.xfer.scp.SCPFileTransfer; - -public class SCPFileTransferWrapper implements FileTransfer, Closeable { - - private SCPFileTransfer scpFileTransfer; - private Consumer<Integer> onCloseFunction; - private SSHClientWrapper originalSSHClient; - - public SCPFileTransferWrapper( - SCPFileTransfer scpFileTransfer, Consumer<Integer> onCloseFunction, SSHClientWrapper originalSSHClient) { - this.scpFileTransfer = scpFileTransfer; - this.onCloseFunction = onCloseFunction; - this.originalSSHClient = originalSSHClient; - } - - @Override - public void upload(String localPath, String remotePath) throws IOException { - scpFileTransfer.upload(localPath, remotePath); - } - - @Override - public void upload(String localPath, String remotePath, long byteOffset) throws IOException { - scpFileTransfer.upload(localPath, remotePath, byteOffset); - } - - @Override - public void download(String remotePath, String localPath) throws IOException { - scpFileTransfer.download(remotePath, localPath); - } - - @Override - public void download(String remotePath, String localPath, long byteOffset) throws IOException { - scpFileTransfer.download(remotePath, localPath, byteOffset); - } - - @Override - public void upload(LocalSourceFile localFile, String remotePath) throws IOException { - scpFileTransfer.upload(localFile, remotePath); - } - - @Override - public void upload(LocalSourceFile localFile, String remotePath, long byteOffset) throws IOException { - scpFileTransfer.upload(localFile, remotePath, byteOffset); - } - - @Override - public void download(String remotePath, LocalDestFile localFile) throws IOException { - scpFileTransfer.download(remotePath, localFile); - } - - @Override - public void download(String remotePath, LocalDestFile localFile, long byteOffset) throws IOException { - scpFileTransfer.download(remotePath, localFile, byteOffset); - } - - @Override - public TransferListener getTransferListener() { - return scpFileTransfer.getTransferListener(); - } - - @Override - public void setTransferListener(TransferListener listener) { - scpFileTransfer.setTransferListener(listener); - } - - @Override - public void close() throws IOException { - onCloseFunction.accept(-1); - } - - public boolean isErrored() { - return originalSSHClient.isErrored(); - } - - public void setErrored(boolean errored) { - this.originalSSHClient.setErrored(errored); - } -} diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java deleted file mode 100644 index 35fdfb58ad..0000000000 --- a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java +++ /dev/null @@ -1,50 +0,0 @@ -/** -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.airavata.helix.adaptor.wrapper; - -import java.io.IOException; -import java.util.function.Consumer; -import net.schmizz.sshj.sftp.SFTPClient; - -public class SFTPClientWrapper extends SFTPClient { - private Consumer<Integer> onCloseFunction; - private SSHClientWrapper originalSSHClient; - - public SFTPClientWrapper( - SFTPClient sftpClient, Consumer<Integer> onCloseFunction, SSHClientWrapper originalSSHClient) { - super(sftpClient.getSFTPEngine()); - this.onCloseFunction = onCloseFunction; - this.originalSSHClient = originalSSHClient; - } - - @Override - public void close() throws IOException { - onCloseFunction.accept(-1); - super.close(); - } - - public boolean isErrored() { - return originalSSHClient.isErrored(); - } - - public void setErrored(boolean errored) { - this.originalSSHClient.setErrored(errored); - } -} diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SSHClientWrapper.java b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SSHClientWrapper.java deleted file mode 100644 index 702be794bd..0000000000 --- a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SSHClientWrapper.java +++ /dev/null @@ -1,44 +0,0 @@ -/** -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.airavata.helix.adaptor.wrapper; - -import net.schmizz.sshj.Config; -import net.schmizz.sshj.SSHClient; - -public class SSHClientWrapper extends SSHClient { - - public SSHClientWrapper() { - super(); - } - - public SSHClientWrapper(Config config) { - super(config); - } - - private boolean errored = false; - - public boolean isErrored() { - return errored; - } - - public void setErrored(boolean errored) { - this.errored = errored; - } -} diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java deleted file mode 100644 index 8b50d3fbaf..0000000000 --- a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java +++ /dev/null @@ -1,194 +0,0 @@ -/** -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.airavata.helix.adaptor.wrapper; - -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.charset.Charset; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -import net.schmizz.sshj.common.LoggerFactory; -import net.schmizz.sshj.common.Message; -import net.schmizz.sshj.common.SSHException; -import net.schmizz.sshj.common.SSHPacket; -import net.schmizz.sshj.connection.ConnectionException; -import net.schmizz.sshj.connection.channel.direct.PTYMode; -import net.schmizz.sshj.connection.channel.direct.Session; -import net.schmizz.sshj.transport.TransportException; - -public class SessionWrapper implements Session { - - private Session session; - private Consumer<Integer> onCloseFunction; - private SSHClientWrapper originalSSHClient; - - public SessionWrapper(Session session, Consumer<Integer> onCloseFunction, SSHClientWrapper originalSSHClient) { - this.session = session; - this.onCloseFunction = onCloseFunction; - this.originalSSHClient = originalSSHClient; - } - - @Override - public void allocateDefaultPTY() throws ConnectionException, TransportException { - session.allocateDefaultPTY(); - } - - @Override - public void allocatePTY(String term, int cols, int rows, int width, int height, Map<PTYMode, Integer> modes) - throws ConnectionException, TransportException { - session.allocatePTY(term, cols, rows, width, height, modes); - } - - @Override - public Command exec(String command) throws ConnectionException, TransportException { - return session.exec(command); - } - - @Override - public void reqX11Forwarding(String authProto, String authCookie, int screen) - throws ConnectionException, TransportException { - session.reqX11Forwarding(authProto, authCookie, screen); - } - - @Override - public void setEnvVar(String name, String value) throws ConnectionException, TransportException { - session.setEnvVar(name, value); - } - - @Override - public Shell startShell() throws ConnectionException, TransportException { - return session.startShell(); - } - - @Override - public Subsystem startSubsystem(String name) throws ConnectionException, TransportException { - return session.startSubsystem(name); - } - - @Override - public void close() throws TransportException, ConnectionException { - onCloseFunction.accept(getID()); - session.close(); - } - - @Override - public boolean getAutoExpand() { - return session.getAutoExpand(); - } - - @Override - public int getID() { - return session.getID(); - } - - @Override - public InputStream getInputStream() { - return session.getInputStream(); - } - - @Override - public int getLocalMaxPacketSize() { - return session.getLocalMaxPacketSize(); - } - - @Override - public long getLocalWinSize() { - return session.getLocalWinSize(); - } - - @Override - public OutputStream getOutputStream() { - return session.getOutputStream(); - } - - @Override - public int getRecipient() { - return session.getRecipient(); - } - - @Override - public Charset getRemoteCharset() { - return session.getRemoteCharset(); - } - - @Override - public int getRemoteMaxPacketSize() { - return session.getRemoteMaxPacketSize(); - } - - @Override - public long getRemoteWinSize() { - return session.getRemoteWinSize(); - } - - @Override - public String getType() { - return session.getType(); - } - - @Override - public boolean isOpen() { - return session.isOpen(); - } - - @Override - public void setAutoExpand(boolean autoExpand) { - session.setAutoExpand(autoExpand); - } - - @Override - public void join() throws ConnectionException { - session.join(); - } - - @Override - public void join(long timeout, TimeUnit unit) throws ConnectionException { - session.join(timeout, unit); - } - - @Override - public boolean isEOF() { - return session.isEOF(); - } - - @Override - public LoggerFactory getLoggerFactory() { - return session.getLoggerFactory(); - } - - @Override - public void notifyError(SSHException error) { - session.notifyError(error); - } - - @Override - public void handle(Message msg, SSHPacket buf) throws SSHException { - session.handle(msg, buf); - } - - public boolean isErrored() { - return originalSSHClient.isErrored(); - } - - public void setErrored(boolean errored) { - this.originalSSHClient.setErrored(errored); - } -}
