This is an automated email from the ASF dual-hosted git repository.

yasith pushed a commit to branch service-layer-improvements
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit f5bf7c9fcc2cee8483e6e45fb8bb94340812f8d7
Author: yasithdev <[email protected]>
AuthorDate: Tue Dec 9 18:20:36 2025 -0600

    cleanup wrappers
---
 .../airavata/helix/adaptor/PoolingSSHJClient.java  | 175 ++++++++++++++-----
 .../airavata/helix/adaptor/SSHJAgentAdaptor.java   | 138 +++++++--------
 .../adaptor/wrapper/SCPFileTransferWrapper.java    | 106 -----------
 .../helix/adaptor/wrapper/SFTPClientWrapper.java   |  50 ------
 .../helix/adaptor/wrapper/SSHClientWrapper.java    |  44 -----
 .../helix/adaptor/wrapper/SessionWrapper.java      | 194 ---------------------
 6 files changed, 197 insertions(+), 510 deletions(-)

diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
index 5952e4e14d..b9bb039f7b 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
@@ -36,10 +36,8 @@ import net.schmizz.sshj.transport.TransportException;
 import net.schmizz.sshj.transport.verification.HostKeyVerifier;
 import net.schmizz.sshj.userauth.UserAuthException;
 import net.schmizz.sshj.userauth.method.AuthMethod;
-import org.apache.airavata.helix.adaptor.wrapper.SCPFileTransferWrapper;
-import org.apache.airavata.helix.adaptor.wrapper.SFTPClientWrapper;
-import org.apache.airavata.helix.adaptor.wrapper.SSHClientWrapper;
-import org.apache.airavata.helix.adaptor.wrapper.SessionWrapper;
+import net.schmizz.sshj.connection.channel.direct.Session;
+import net.schmizz.sshj.xfer.scp.SCPFileTransfer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,7 +52,8 @@ public class PoolingSSHJClient extends SSHClient {
     private static final Logger logger = 
LoggerFactory.getLogger(PoolingSSHJClient.class);
 
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
-    private final Map<SSHClientWrapper, SSHClientInfo> clientInfoMap = new 
HashMap<>();
+    private final Map<SSHClient, SSHClientInfo> clientInfoMap = new 
HashMap<>();
+    private final Map<SSHClient, Boolean> clientErrorMap = new HashMap<>();
 
     private HostKeyVerifier hostKeyVerifier;
     private String username;
@@ -100,8 +99,8 @@ public class PoolingSSHJClient extends SSHClient {
 
     ////////////////// client specific operations ///////
 
-    private SSHClientWrapper newClientWithSessionValidation() throws 
IOException {
-        SSHClientWrapper newClient = createNewSSHClient();
+    private SSHClient newClientWithSessionValidation() throws IOException {
+        SSHClient newClient = createNewSSHClient();
         SSHClientInfo info = new SSHClientInfo(1, System.currentTimeMillis(), 
clientInfoMap.size());
         clientInfoMap.put(newClient, info);
 
@@ -137,7 +136,7 @@ public class PoolingSSHJClient extends SSHClient {
         return newClient;
     }
 
-    private SSHClientWrapper leaseSSHClient() throws Exception {
+    private SSHClient leaseSSHClient() throws Exception {
         lock.writeLock().lock();
 
         try {
@@ -146,10 +145,10 @@ public class PoolingSSHJClient extends SSHClient {
 
             } else {
 
-                Optional<Map.Entry<SSHClientWrapper, SSHClientInfo>> 
minEntryOp = clientInfoMap.entrySet().stream()
+                Optional<Map.Entry<SSHClient, SSHClientInfo>> minEntryOp = 
clientInfoMap.entrySet().stream()
                         .min(Comparator.comparing(entry -> 
entry.getValue().sessionCount));
                 if (minEntryOp.isPresent()) {
-                    Map.Entry<SSHClientWrapper, SSHClientInfo> minEntry = 
minEntryOp.get();
+                    Map.Entry<SSHClient, SSHClientInfo> minEntry = 
minEntryOp.get();
                     // use the connection with least amount of sessions 
created.
 
                     logger.debug(
@@ -176,9 +175,9 @@ public class PoolingSSHJClient extends SSHClient {
                         
minEntry.getValue().setSessionCount(minEntry.getValue().getSessionCount() + 1);
                         
minEntry.getValue().setLastAccessedTime(System.currentTimeMillis());
 
-                        SSHClientWrapper sshClient = minEntry.getKey();
+                        SSHClient sshClient = minEntry.getKey();
 
-                        if (!sshClient.isConnected() || 
!sshClient.isAuthenticated() || sshClient.isErrored()) {
+                        if (!sshClient.isConnected() || 
!sshClient.isAuthenticated() || isClientErrored(sshClient)) {
                             logger.warn(
                                     "Client for host {} is not connected or 
not authenticated. Creating a new client",
                                     host);
@@ -198,14 +197,14 @@ public class PoolingSSHJClient extends SSHClient {
         }
     }
 
-    private void removeDisconnectedClients(SSHClientWrapper client, boolean 
doDisconnect) {
+    private void removeDisconnectedClients(SSHClient client, boolean 
doDisconnect) {
         lock.writeLock().lock();
 
         if (doDisconnect) {
             try {
                 client.disconnect();
             } catch (Exception e) {
-                log.warn("Errored while disconnecting the client " + 
e.getMessage());
+                logger.warn("Errored while disconnecting the client " + 
e.getMessage());
                 // Ignore
             }
         }
@@ -217,6 +216,7 @@ public class PoolingSSHJClient extends SSHClient {
                         clientInfoMap.get(client).getClientId(),
                         host);
                 clientInfoMap.remove(client);
+                clientErrorMap.remove(client);
             }
 
         } finally {
@@ -224,7 +224,7 @@ public class PoolingSSHJClient extends SSHClient {
         }
     }
 
-    private void untrackClosedSessions(SSHClientWrapper client, int sessionId) 
{
+    public void untrackClosedSessions(SSHClient client, int sessionId) {
         lock.writeLock().lock();
 
         try {
@@ -243,7 +243,7 @@ public class PoolingSSHJClient extends SSHClient {
     }
 
     private void removeStaleConnections() {
-        List<Map.Entry<SSHClientWrapper, SSHClientInfo>> entriesTobeRemoved;
+        List<Map.Entry<SSHClient, SSHClientInfo>> entriesTobeRemoved;
         lock.writeLock().lock();
         logger.info("Current active connections for  {} @ {} : {} are {}", 
username, host, port, clientInfoMap.size());
         try {
@@ -266,19 +266,20 @@ public class PoolingSSHJClient extends SSHClient {
         entriesTobeRemoved.forEach(entry -> {
             try {
                 entry.getKey().disconnect();
+                clientErrorMap.remove(entry.getKey());
             } catch (IOException e) {
                 logger.warn("Failed to disconnect connection {} for host {}", 
entry.getValue().clientId, host);
             }
         });
     }
 
-    private SSHClientWrapper createNewSSHClient() throws IOException {
+    private SSHClient createNewSSHClient() throws IOException {
 
-        SSHClientWrapper sshClient;
+        SSHClient sshClient;
         if (config != null) {
-            sshClient = new SSHClientWrapper(config);
+            sshClient = new SSHClient(config);
         } else {
-            sshClient = new SSHClientWrapper();
+            sshClient = new SSHClient();
         }
 
         sshClient.getConnection().getTransport().setDisconnectListener(new 
DisconnectListener() {
@@ -304,70 +305,150 @@ public class PoolingSSHJClient extends SSHClient {
         return sshClient;
     }
 
-    public SessionWrapper startSessionWrapper() throws Exception {
+    public static class SessionResource {
+        private final Session session;
+        private final SSHClient sshClient;
+        private final PoolingSSHJClient pool;
 
-        final SSHClientWrapper sshClient = leaseSSHClient();
+        SessionResource(Session session, SSHClient sshClient, 
PoolingSSHJClient pool) {
+            this.session = session;
+            this.sshClient = sshClient;
+            this.pool = pool;
+        }
 
-        try {
-            return new SessionWrapper(
-                    sshClient.startSession(), (id) -> 
untrackClosedSessions(sshClient, id), sshClient);
+        public Session getSession() {
+            return session;
+        }
+
+        public void close() throws Exception {
+            session.close();
+            pool.untrackClosedSessions(sshClient, session.getID());
+        }
+
+        public void markErrored() {
+            pool.markClientErrored(sshClient);
+        }
+    }
+
+    public static class SCPFileTransferResource {
+        private final SCPFileTransfer fileTransfer;
+        private final SSHClient sshClient;
+        private final PoolingSSHJClient pool;
+
+        SCPFileTransferResource(SCPFileTransfer fileTransfer, SSHClient 
sshClient, PoolingSSHJClient pool) {
+            this.fileTransfer = fileTransfer;
+            this.sshClient = sshClient;
+            this.pool = pool;
+        }
+
+        public SCPFileTransfer getFileTransfer() {
+            return fileTransfer;
+        }
 
+        public void close() throws Exception {
+            // SCPFileTransfer doesn't need explicit closing, just track the 
session
+            pool.untrackClosedSessions(sshClient, -1);
+        }
+
+        public void markErrored() {
+            pool.markClientErrored(sshClient);
+        }
+    }
+
+    public static class SFTPClientResource {
+        private final SFTPClient sftpClient;
+        private final SSHClient sshClient;
+        private final PoolingSSHJClient pool;
+
+        SFTPClientResource(SFTPClient sftpClient, SSHClient sshClient, 
PoolingSSHJClient pool) {
+            this.sftpClient = sftpClient;
+            this.sshClient = sshClient;
+            this.pool = pool;
+        }
+
+        public SFTPClient getSFTPClient() {
+            return sftpClient;
+        }
+
+        public void close() throws Exception {
+            sftpClient.close();
+            pool.untrackClosedSessions(sshClient, -1);
+        }
+
+        public void markErrored() {
+            pool.markClientErrored(sshClient);
+        }
+    }
+
+    public SessionResource startSessionResource() throws Exception {
+        final SSHClient sshClient = leaseSSHClient();
+        try {
+            Session session = sshClient.startSession();
+            return new SessionResource(session, sshClient, this);
         } catch (Exception e) {
             if (sshClient != null) {
                 // If it is a ConnectionExceptions, explicitly invalidate the 
client
                 if (e instanceof ConnectionException) {
-                    sshClient.setErrored(true);
+                    markClientErrored(sshClient);
                 }
-
                 untrackClosedSessions(sshClient, -1);
             }
             throw e;
         }
     }
 
-    public SCPFileTransferWrapper newSCPFileTransferWrapper() throws Exception 
{
-
-        final SSHClientWrapper sshClient = leaseSSHClient();
-
+    public SCPFileTransferResource newSCPFileTransferResource() throws 
Exception {
+        final SSHClient sshClient = leaseSSHClient();
         try {
-            return new SCPFileTransferWrapper(
-                    sshClient.newSCPFileTransfer(), (id) -> 
untrackClosedSessions(sshClient, id), sshClient);
-
+            SCPFileTransfer fileTransfer = sshClient.newSCPFileTransfer();
+            return new SCPFileTransferResource(fileTransfer, sshClient, this);
         } catch (Exception e) {
-
             if (sshClient != null) {
                 // If it is a ConnectionExceptions, explicitly invalidate the 
client
                 if (e instanceof ConnectionException) {
-                    sshClient.setErrored(true);
+                    markClientErrored(sshClient);
                 }
-
                 untrackClosedSessions(sshClient, -1);
             }
             throw e;
         }
     }
 
-    public SFTPClientWrapper newSFTPClientWrapper() throws Exception {
-
-        final SSHClientWrapper sshClient = leaseSSHClient();
-
+    public SFTPClientResource newSFTPClientResource() throws Exception {
+        final SSHClient sshClient = leaseSSHClient();
         try {
-            return new SFTPClientWrapper(
-                    sshClient.newSFTPClient(), (id) -> 
untrackClosedSessions(sshClient, id), sshClient);
+            SFTPClient sftpClient = sshClient.newSFTPClient();
+            return new SFTPClientResource(sftpClient, sshClient, this);
         } catch (Exception e) {
-
             if (sshClient != null) {
                 // If it is a ConnectionExceptions, explicitly invalidate the 
client
                 if (e instanceof ConnectionException) {
-                    sshClient.setErrored(true);
+                    markClientErrored(sshClient);
                 }
-
                 untrackClosedSessions(sshClient, -1);
             }
             throw e;
         }
     }
 
+    public void markClientErrored(SSHClient client) {
+        lock.writeLock().lock();
+        try {
+            clientErrorMap.put(client, true);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private boolean isClientErrored(SSHClient client) {
+        lock.readLock().lock();
+        try {
+            return clientErrorMap.getOrDefault(client, false);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
     public class SSHClientInfo {
 
         private int sessionCount;
@@ -461,7 +542,7 @@ public class PoolingSSHJClient extends SSHClient {
         return this;
     }
 
-    public Map<SSHClientWrapper, SSHClientInfo> getClientInfoMap() {
+    public Map<SSHClient, SSHClientInfo> getClientInfoMap() {
         return clientInfoMap;
     }
 }
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
index 82b55aae9c..a138b6341a 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
@@ -43,9 +43,9 @@ import net.schmizz.sshj.xfer.LocalDestFile;
 import net.schmizz.sshj.xfer.LocalFileFilter;
 import net.schmizz.sshj.xfer.LocalSourceFile;
 import org.apache.airavata.agents.api.*;
-import org.apache.airavata.helix.adaptor.wrapper.SCPFileTransferWrapper;
-import org.apache.airavata.helix.adaptor.wrapper.SFTPClientWrapper;
-import org.apache.airavata.helix.adaptor.wrapper.SessionWrapper;
+import org.apache.airavata.helix.adaptor.PoolingSSHJClient.SessionResource;
+import 
org.apache.airavata.helix.adaptor.PoolingSSHJClient.SCPFileTransferResource;
+import org.apache.airavata.helix.adaptor.PoolingSSHJClient.SFTPClientResource;
 import org.apache.airavata.helix.agent.ssh.StandardOutReader;
 import 
org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
 import 
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
@@ -222,11 +222,11 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
 
     @Override
     public CommandOutput executeCommand(String command, String 
workingDirectory) throws AgentException {
-        SessionWrapper session = null;
+        SessionResource sessionResource = null;
         try {
-            session = sshjClient.startSessionWrapper();
+            sessionResource = sshjClient.startSessionResource();
             Session.Command exec =
-                    session.exec((workingDirectory != null ? "cd " + 
workingDirectory + "; " : "") + command);
+                    sessionResource.getSession().exec((workingDirectory != 
null ? "cd " + workingDirectory + "; " : "") + command);
             StandardOutReader standardOutReader = new StandardOutReader();
 
             try {
@@ -241,15 +241,15 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
 
         } catch (Exception e) {
             if (e instanceof ConnectionException) {
-                Optional.ofNullable(session).ifPresent(ft -> 
ft.setErrored(true));
+                
Optional.ofNullable(sessionResource).ifPresent(SessionResource::markErrored);
             }
             throw new AgentException(e);
 
         } finally {
-            Optional.ofNullable(session).ifPresent(ss -> {
+            Optional.ofNullable(sessionResource).ifPresent(ss -> {
                 try {
                     ss.close();
-                } catch (IOException e) {
+                } catch (Exception e) {
                     // Ignore
                 }
             });
@@ -263,25 +263,25 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
 
     @Override
     public void createDirectory(String path, boolean recursive) throws 
AgentException {
-        SFTPClientWrapper sftpClient = null;
+        SFTPClientResource sftpClientResource = null;
         try {
-            sftpClient = sshjClient.newSFTPClientWrapper();
+            sftpClientResource = sshjClient.newSFTPClientResource();
             if (recursive) {
-                sftpClient.mkdirs(path);
+                sftpClientResource.getSFTPClient().mkdirs(path);
             } else {
-                sftpClient.mkdir(path);
+                sftpClientResource.getSFTPClient().mkdir(path);
             }
         } catch (Exception e) {
             if (e instanceof ConnectionException) {
-                Optional.ofNullable(sftpClient).ifPresent(ft -> 
ft.setErrored(true));
+                
Optional.ofNullable(sftpClientResource).ifPresent(SFTPClientResource::markErrored);
             }
             throw new AgentException(e);
 
         } finally {
-            Optional.ofNullable(sftpClient).ifPresent(client -> {
+            Optional.ofNullable(sftpClientResource).ifPresent(client -> {
                 try {
                     client.close();
-                } catch (IOException e) {
+                } catch (Exception e) {
                     // Ignore
                 }
             });
@@ -293,22 +293,22 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
         if (path == null || path.trim().isEmpty()) {
             throw new AgentException("Directory path cannot be null or empty");
         }
-        SFTPClientWrapper sftpClient = null;
+        SFTPClientResource sftpClientResource = null;
         try {
-            sftpClient = sshjClient.newSFTPClientWrapper();
-            sftpClient.rmdir(path);
+            sftpClientResource = sshjClient.newSFTPClientResource();
+            sftpClientResource.getSFTPClient().rmdir(path);
         } catch (Exception e) {
             if (e instanceof ConnectionException) {
-                Optional.ofNullable(sftpClient).ifPresent(ft -> 
ft.setErrored(true));
+                
Optional.ofNullable(sftpClientResource).ifPresent(SFTPClientResource::markErrored);
             }
             logger.error("Error while deleting directory {}", path, e);
             throw new AgentException(e);
 
         } finally {
-            Optional.ofNullable(sftpClient).ifPresent(client -> {
+            Optional.ofNullable(sftpClientResource).ifPresent(client -> {
                 try {
                     client.close();
-                } catch (IOException e) {
+                } catch (Exception e) {
                     // Ignore
                 }
             });
@@ -317,22 +317,22 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
 
     @Override
     public void uploadFile(String localFile, String remoteFile) throws 
AgentException {
-        SCPFileTransferWrapper fileTransfer = null;
+        SCPFileTransferResource fileTransferResource = null;
         try {
-            fileTransfer = sshjClient.newSCPFileTransferWrapper();
-            fileTransfer.upload(localFile, remoteFile);
+            fileTransferResource = sshjClient.newSCPFileTransferResource();
+            fileTransferResource.getFileTransfer().upload(localFile, 
remoteFile);
 
         } catch (Exception e) {
             if (e instanceof ConnectionException) {
-                Optional.ofNullable(fileTransfer).ifPresent(ft -> 
ft.setErrored(true));
+                
Optional.ofNullable(fileTransferResource).ifPresent(SCPFileTransferResource::markErrored);
             }
             throw new AgentException(e);
 
         } finally {
-            Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper 
-> {
+            Optional.ofNullable(fileTransferResource).ifPresent(resource -> {
                 try {
-                    scpFileTransferWrapper.close();
-                } catch (IOException e) {
+                    resource.close();
+                } catch (Exception e) {
                     // Ignore
                 }
             });
@@ -341,11 +341,11 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
 
     @Override
     public void uploadFile(InputStream localInStream, FileMetadata metadata, 
String remoteFile) throws AgentException {
-        SCPFileTransferWrapper fileTransfer = null;
+        SCPFileTransferResource fileTransferResource = null;
 
         try {
-            fileTransfer = sshjClient.newSCPFileTransferWrapper();
-            fileTransfer.upload(
+            fileTransferResource = sshjClient.newSCPFileTransferResource();
+            fileTransferResource.getFileTransfer().upload(
                     new LocalSourceFile() {
                         @Override
                         public String getName() {
@@ -401,15 +401,15 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
                     remoteFile);
         } catch (Exception e) {
             if (e instanceof ConnectionException) {
-                Optional.ofNullable(fileTransfer).ifPresent(ft -> 
ft.setErrored(true));
+                
Optional.ofNullable(fileTransferResource).ifPresent(SCPFileTransferResource::markErrored);
             }
             throw new AgentException(e);
 
         } finally {
-            Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper 
-> {
+            Optional.ofNullable(fileTransferResource).ifPresent(resource -> {
                 try {
-                    scpFileTransferWrapper.close();
-                } catch (IOException e) {
+                    resource.close();
+                } catch (Exception e) {
                     // Ignore
                 }
             });
@@ -418,22 +418,22 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
 
     @Override
     public void downloadFile(String remoteFile, String localFile) throws 
AgentException {
-        SCPFileTransferWrapper fileTransfer = null;
+        SCPFileTransferResource fileTransferResource = null;
         try {
-            fileTransfer = sshjClient.newSCPFileTransferWrapper();
-            fileTransfer.download(remoteFile, localFile);
+            fileTransferResource = sshjClient.newSCPFileTransferResource();
+            fileTransferResource.getFileTransfer().download(remoteFile, 
localFile);
 
         } catch (Exception e) {
             if (e instanceof ConnectionException) {
-                Optional.ofNullable(fileTransfer).ifPresent(ft -> 
ft.setErrored(true));
+                
Optional.ofNullable(fileTransferResource).ifPresent(SCPFileTransferResource::markErrored);
             }
             throw new AgentException(e);
 
         } finally {
-            Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper 
-> {
+            Optional.ofNullable(fileTransferResource).ifPresent(resource -> {
                 try {
-                    scpFileTransferWrapper.close();
-                } catch (IOException e) {
+                    resource.close();
+                } catch (Exception e) {
                     // Ignore
                 }
             });
@@ -443,10 +443,10 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
     @Override
     public void downloadFile(String remoteFile, OutputStream localOutStream, 
FileMetadata metadata)
             throws AgentException {
-        SCPFileTransferWrapper fileTransfer = null;
+        SCPFileTransferResource fileTransferResource = null;
         try {
-            fileTransfer = sshjClient.newSCPFileTransferWrapper();
-            fileTransfer.download(remoteFile, new LocalDestFile() {
+            fileTransferResource = sshjClient.newSCPFileTransferResource();
+            fileTransferResource.getFileTransfer().download(remoteFile, new 
LocalDestFile() {
                 @Override
                 public long getLength() {
                     return metadata.getSize();
@@ -488,15 +488,15 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
             });
         } catch (Exception e) {
             if (e instanceof ConnectionException) {
-                Optional.ofNullable(fileTransfer).ifPresent(ft -> 
ft.setErrored(true));
+                
Optional.ofNullable(fileTransferResource).ifPresent(SCPFileTransferResource::markErrored);
             }
             throw new AgentException(e);
 
         } finally {
-            Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper 
-> {
+            Optional.ofNullable(fileTransferResource).ifPresent(resource -> {
                 try {
-                    scpFileTransferWrapper.close();
-                } catch (IOException e) {
+                    resource.close();
+                } catch (Exception e) {
                     // Ignore
                 }
             });
@@ -505,22 +505,22 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
 
     @Override
     public List<String> listDirectory(String path) throws AgentException {
-        SFTPClientWrapper sftpClient = null;
+        SFTPClientResource sftpClientResource = null;
         try {
-            sftpClient = sshjClient.newSFTPClientWrapper();
-            List<RemoteResourceInfo> ls = sftpClient.ls(path);
+            sftpClientResource = sshjClient.newSFTPClientResource();
+            List<RemoteResourceInfo> ls = 
sftpClientResource.getSFTPClient().ls(path);
             return 
ls.stream().map(RemoteResourceInfo::getName).collect(Collectors.toList());
         } catch (Exception e) {
             if (e instanceof ConnectionException) {
-                Optional.ofNullable(sftpClient).ifPresent(ft -> 
ft.setErrored(true));
+                
Optional.ofNullable(sftpClientResource).ifPresent(SFTPClientResource::markErrored);
             }
             throw new AgentException(e);
 
         } finally {
-            Optional.ofNullable(sftpClient).ifPresent(client -> {
+            Optional.ofNullable(sftpClientResource).ifPresent(client -> {
                 try {
                     client.close();
-                } catch (IOException e) {
+                } catch (Exception e) {
                     // Ignore
                 }
             });
@@ -529,21 +529,21 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
 
     @Override
     public Boolean doesFileExist(String filePath) throws AgentException {
-        SFTPClientWrapper sftpClient = null;
+        SFTPClientResource sftpClientResource = null;
         try {
-            sftpClient = sshjClient.newSFTPClientWrapper();
-            return sftpClient.statExistence(filePath) != null;
+            sftpClientResource = sshjClient.newSFTPClientResource();
+            return sftpClientResource.getSFTPClient().statExistence(filePath) 
!= null;
         } catch (Exception e) {
             if (e instanceof ConnectionException) {
-                Optional.ofNullable(sftpClient).ifPresent(ft -> 
ft.setErrored(true));
+                
Optional.ofNullable(sftpClientResource).ifPresent(SFTPClientResource::markErrored);
             }
             throw new AgentException(e);
 
         } finally {
-            Optional.ofNullable(sftpClient).ifPresent(client -> {
+            Optional.ofNullable(sftpClientResource).ifPresent(client -> {
                 try {
                     client.close();
-                } catch (IOException e) {
+                } catch (Exception e) {
                     // Ignore
                 }
             });
@@ -579,10 +579,10 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
 
     @Override
     public FileMetadata getFileMetadata(String remoteFile) throws 
AgentException {
-        SFTPClientWrapper sftpClient = null;
+        SFTPClientResource sftpClientResource = null;
         try {
-            sftpClient = sshjClient.newSFTPClientWrapper();
-            FileAttributes stat = sftpClient.stat(remoteFile);
+            sftpClientResource = sshjClient.newSFTPClientResource();
+            FileAttributes stat = 
sftpClientResource.getSFTPClient().stat(remoteFile);
             FileMetadata metadata = new FileMetadata();
             metadata.setName(new File(remoteFile).getName());
             metadata.setSize(stat.getSize());
@@ -591,15 +591,15 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
             return metadata;
         } catch (Exception e) {
             if (e instanceof ConnectionException) {
-                Optional.ofNullable(sftpClient).ifPresent(ft -> 
ft.setErrored(true));
+                
Optional.ofNullable(sftpClientResource).ifPresent(SFTPClientResource::markErrored);
             }
             throw new AgentException(e);
 
         } finally {
-            Optional.ofNullable(sftpClient).ifPresent(scpFileTransferWrapper 
-> {
+            Optional.ofNullable(sftpClientResource).ifPresent(resource -> {
                 try {
-                    scpFileTransferWrapper.close();
-                } catch (IOException e) {
+                    resource.close();
+                } catch (Exception e) {
                     // Ignore
                 }
             });
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java
deleted file mode 100644
index 2b694f4a62..0000000000
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.airavata.helix.adaptor.wrapper;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.function.Consumer;
-import net.schmizz.sshj.xfer.FileTransfer;
-import net.schmizz.sshj.xfer.LocalDestFile;
-import net.schmizz.sshj.xfer.LocalSourceFile;
-import net.schmizz.sshj.xfer.TransferListener;
-import net.schmizz.sshj.xfer.scp.SCPFileTransfer;
-
-public class SCPFileTransferWrapper implements FileTransfer, Closeable {
-
-    private SCPFileTransfer scpFileTransfer;
-    private Consumer<Integer> onCloseFunction;
-    private SSHClientWrapper originalSSHClient;
-
-    public SCPFileTransferWrapper(
-            SCPFileTransfer scpFileTransfer, Consumer<Integer> 
onCloseFunction, SSHClientWrapper originalSSHClient) {
-        this.scpFileTransfer = scpFileTransfer;
-        this.onCloseFunction = onCloseFunction;
-        this.originalSSHClient = originalSSHClient;
-    }
-
-    @Override
-    public void upload(String localPath, String remotePath) throws IOException 
{
-        scpFileTransfer.upload(localPath, remotePath);
-    }
-
-    @Override
-    public void upload(String localPath, String remotePath, long byteOffset) 
throws IOException {
-        scpFileTransfer.upload(localPath, remotePath, byteOffset);
-    }
-
-    @Override
-    public void download(String remotePath, String localPath) throws 
IOException {
-        scpFileTransfer.download(remotePath, localPath);
-    }
-
-    @Override
-    public void download(String remotePath, String localPath, long byteOffset) 
throws IOException {
-        scpFileTransfer.download(remotePath, localPath, byteOffset);
-    }
-
-    @Override
-    public void upload(LocalSourceFile localFile, String remotePath) throws 
IOException {
-        scpFileTransfer.upload(localFile, remotePath);
-    }
-
-    @Override
-    public void upload(LocalSourceFile localFile, String remotePath, long 
byteOffset) throws IOException {
-        scpFileTransfer.upload(localFile, remotePath, byteOffset);
-    }
-
-    @Override
-    public void download(String remotePath, LocalDestFile localFile) throws 
IOException {
-        scpFileTransfer.download(remotePath, localFile);
-    }
-
-    @Override
-    public void download(String remotePath, LocalDestFile localFile, long 
byteOffset) throws IOException {
-        scpFileTransfer.download(remotePath, localFile, byteOffset);
-    }
-
-    @Override
-    public TransferListener getTransferListener() {
-        return scpFileTransfer.getTransferListener();
-    }
-
-    @Override
-    public void setTransferListener(TransferListener listener) {
-        scpFileTransfer.setTransferListener(listener);
-    }
-
-    @Override
-    public void close() throws IOException {
-        onCloseFunction.accept(-1);
-    }
-
-    public boolean isErrored() {
-        return originalSSHClient.isErrored();
-    }
-
-    public void setErrored(boolean errored) {
-        this.originalSSHClient.setErrored(errored);
-    }
-}
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java
deleted file mode 100644
index 35fdfb58ad..0000000000
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.airavata.helix.adaptor.wrapper;
-
-import java.io.IOException;
-import java.util.function.Consumer;
-import net.schmizz.sshj.sftp.SFTPClient;
-
-public class SFTPClientWrapper extends SFTPClient {
-    private Consumer<Integer> onCloseFunction;
-    private SSHClientWrapper originalSSHClient;
-
-    public SFTPClientWrapper(
-            SFTPClient sftpClient, Consumer<Integer> onCloseFunction, 
SSHClientWrapper originalSSHClient) {
-        super(sftpClient.getSFTPEngine());
-        this.onCloseFunction = onCloseFunction;
-        this.originalSSHClient = originalSSHClient;
-    }
-
-    @Override
-    public void close() throws IOException {
-        onCloseFunction.accept(-1);
-        super.close();
-    }
-
-    public boolean isErrored() {
-        return originalSSHClient.isErrored();
-    }
-
-    public void setErrored(boolean errored) {
-        this.originalSSHClient.setErrored(errored);
-    }
-}
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SSHClientWrapper.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SSHClientWrapper.java
deleted file mode 100644
index 702be794bd..0000000000
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SSHClientWrapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.airavata.helix.adaptor.wrapper;
-
-import net.schmizz.sshj.Config;
-import net.schmizz.sshj.SSHClient;
-
-public class SSHClientWrapper extends SSHClient {
-
-    public SSHClientWrapper() {
-        super();
-    }
-
-    public SSHClientWrapper(Config config) {
-        super(config);
-    }
-
-    private boolean errored = false;
-
-    public boolean isErrored() {
-        return errored;
-    }
-
-    public void setErrored(boolean errored) {
-        this.errored = errored;
-    }
-}
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java
 
b/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java
deleted file mode 100644
index 8b50d3fbaf..0000000000
--- 
a/airavata-api/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.airavata.helix.adaptor.wrapper;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import net.schmizz.sshj.common.LoggerFactory;
-import net.schmizz.sshj.common.Message;
-import net.schmizz.sshj.common.SSHException;
-import net.schmizz.sshj.common.SSHPacket;
-import net.schmizz.sshj.connection.ConnectionException;
-import net.schmizz.sshj.connection.channel.direct.PTYMode;
-import net.schmizz.sshj.connection.channel.direct.Session;
-import net.schmizz.sshj.transport.TransportException;
-
-public class SessionWrapper implements Session {
-
-    private Session session;
-    private Consumer<Integer> onCloseFunction;
-    private SSHClientWrapper originalSSHClient;
-
-    public SessionWrapper(Session session, Consumer<Integer> onCloseFunction, 
SSHClientWrapper originalSSHClient) {
-        this.session = session;
-        this.onCloseFunction = onCloseFunction;
-        this.originalSSHClient = originalSSHClient;
-    }
-
-    @Override
-    public void allocateDefaultPTY() throws ConnectionException, 
TransportException {
-        session.allocateDefaultPTY();
-    }
-
-    @Override
-    public void allocatePTY(String term, int cols, int rows, int width, int 
height, Map<PTYMode, Integer> modes)
-            throws ConnectionException, TransportException {
-        session.allocatePTY(term, cols, rows, width, height, modes);
-    }
-
-    @Override
-    public Command exec(String command) throws ConnectionException, 
TransportException {
-        return session.exec(command);
-    }
-
-    @Override
-    public void reqX11Forwarding(String authProto, String authCookie, int 
screen)
-            throws ConnectionException, TransportException {
-        session.reqX11Forwarding(authProto, authCookie, screen);
-    }
-
-    @Override
-    public void setEnvVar(String name, String value) throws 
ConnectionException, TransportException {
-        session.setEnvVar(name, value);
-    }
-
-    @Override
-    public Shell startShell() throws ConnectionException, TransportException {
-        return session.startShell();
-    }
-
-    @Override
-    public Subsystem startSubsystem(String name) throws ConnectionException, 
TransportException {
-        return session.startSubsystem(name);
-    }
-
-    @Override
-    public void close() throws TransportException, ConnectionException {
-        onCloseFunction.accept(getID());
-        session.close();
-    }
-
-    @Override
-    public boolean getAutoExpand() {
-        return session.getAutoExpand();
-    }
-
-    @Override
-    public int getID() {
-        return session.getID();
-    }
-
-    @Override
-    public InputStream getInputStream() {
-        return session.getInputStream();
-    }
-
-    @Override
-    public int getLocalMaxPacketSize() {
-        return session.getLocalMaxPacketSize();
-    }
-
-    @Override
-    public long getLocalWinSize() {
-        return session.getLocalWinSize();
-    }
-
-    @Override
-    public OutputStream getOutputStream() {
-        return session.getOutputStream();
-    }
-
-    @Override
-    public int getRecipient() {
-        return session.getRecipient();
-    }
-
-    @Override
-    public Charset getRemoteCharset() {
-        return session.getRemoteCharset();
-    }
-
-    @Override
-    public int getRemoteMaxPacketSize() {
-        return session.getRemoteMaxPacketSize();
-    }
-
-    @Override
-    public long getRemoteWinSize() {
-        return session.getRemoteWinSize();
-    }
-
-    @Override
-    public String getType() {
-        return session.getType();
-    }
-
-    @Override
-    public boolean isOpen() {
-        return session.isOpen();
-    }
-
-    @Override
-    public void setAutoExpand(boolean autoExpand) {
-        session.setAutoExpand(autoExpand);
-    }
-
-    @Override
-    public void join() throws ConnectionException {
-        session.join();
-    }
-
-    @Override
-    public void join(long timeout, TimeUnit unit) throws ConnectionException {
-        session.join(timeout, unit);
-    }
-
-    @Override
-    public boolean isEOF() {
-        return session.isEOF();
-    }
-
-    @Override
-    public LoggerFactory getLoggerFactory() {
-        return session.getLoggerFactory();
-    }
-
-    @Override
-    public void notifyError(SSHException error) {
-        session.notifyError(error);
-    }
-
-    @Override
-    public void handle(Message msg, SSHPacket buf) throws SSHException {
-        session.handle(msg, buf);
-    }
-
-    public boolean isErrored() {
-        return originalSSHClient.isErrored();
-    }
-
-    public void setErrored(boolean errored) {
-        this.originalSSHClient.setErrored(errored);
-    }
-}


Reply via email to