Fixed AIRAVATA-2284, using guave caching with time based eviction

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/644bf896
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/644bf896
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/644bf896

Branch: refs/heads/develop
Commit: 644bf89629ac61ab35917bafa737ecbcbe547463
Parents: c50e3ec
Author: Shameera Rathnayaka <[email protected]>
Authored: Fri Dec 30 15:50:38 2016 -0500
Committer: Shameera Rathnayaka <[email protected]>
Committed: Fri Dec 30 15:50:38 2016 -0500

----------------------------------------------------------------------
 .../airavata/common/utils/ServerSettings.java   |  8 ++++
 .../main/resources/airavata-server.properties   |  2 +
 .../org/apache/airavata/gfac/impl/Factory.java  | 39 ++++++++++++++++----
 .../airavata/gfac/impl/HPCRemoteCluster.java    | 29 ++++++---------
 .../airavata/messaging/core/Subscriber.java     |  2 -
 5 files changed, 54 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/644bf896/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git 
a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
 
b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index cdf49e0..7ab807e 100644
--- 
a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ 
b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -127,6 +127,9 @@ public class ServerSettings extends ApplicationSettings {
     public static final String USER_PROFILE_SERVER_HOST = 
"user.profile.server.host";
     public static final String USER_PROFILE_SERVER_PORT = 
"user.profile.server.port";
 
+    /* Caching */
+    private static final String SESSION_CACHE_ACCESS_TIME_OUT = 
"ssh.session.cache.access.timeout";
+
     // todo until AIRAVATA-2066 is finished, keep server side list 
configurations here.
     private static Map<String, String[]> listConfigurations = new HashMap<>();
 
@@ -470,4 +473,9 @@ public class ServerSettings extends ApplicationSettings {
     public static int getAuroraSchedulerTimeout() throws 
ApplicationSettingsException {
        return Integer.valueOf(getSetting(AURORA_SCHEDULER_CONNECT_TIMEOUT_MS));
     }
+
+    public static int getSessionCacheAccessTimeout() {
+        return Integer.valueOf(getSetting(SESSION_CACHE_ACCESS_TIME_OUT, 
"30"));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/644bf896/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git 
a/modules/configuration/server/src/main/resources/airavata-server.properties 
b/modules/configuration/server/src/main/resources/airavata-server.properties
index ffc9ea8..defc1cc 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -170,6 +170,8 @@ gfac.server.port=8950
 gfac.thread.pool.size=50
 host.scheduler=org.apache.airavata.gfac.impl.DefaultHostScheduler
 
+# ssh session access timeout in minutes default is 30 minutes
+#ssh.session.cache.access.timeout=30
 
 ###########################################################################
 #  Registry Server Configurations

http://git-wip-us.apache.org/repos/asf/airavata/blob/644bf896/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index bf06086..fda5386 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -20,6 +20,9 @@
  */
 package org.apache.airavata.gfac.impl;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
 import com.jcraft.jsch.Channel;
 import com.jcraft.jsch.ChannelExec;
 import com.jcraft.jsch.JSch;
@@ -33,7 +36,6 @@ import 
org.apache.airavata.credential.store.credential.Credential;
 import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
 import org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.credential.store.store.CredentialStoreException;
-import org.apache.airavata.gfac.core.GFac;
 import org.apache.airavata.gfac.core.GFacEngine;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
@@ -55,13 +57,21 @@ import org.apache.airavata.gfac.core.task.JobSubmissionTask;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
 import org.apache.airavata.gfac.core.watcher.RedeliveryRequestWatcher;
-import org.apache.airavata.gfac.impl.job.*;
+import org.apache.airavata.gfac.impl.job.ForkJobConfiguration;
+import org.apache.airavata.gfac.impl.job.LSFJobConfiguration;
+import org.apache.airavata.gfac.impl.job.PBSJobConfiguration;
+import org.apache.airavata.gfac.impl.job.SlurmJobConfiguration;
+import org.apache.airavata.gfac.impl.job.UGEJobConfiguration;
 import org.apache.airavata.gfac.impl.task.ArchiveTask;
 import org.apache.airavata.gfac.impl.watcher.CancelRequestWatcherImpl;
 import org.apache.airavata.gfac.impl.watcher.RedeliveryRequestWatcherImpl;
 import org.apache.airavata.gfac.monitor.cloud.AuroraJobMonitor;
 import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
-import org.apache.airavata.messaging.core.*;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.Subscriber;
+import org.apache.airavata.messaging.core.Type;
 import org.apache.airavata.messaging.core.impl.RabbitMQPublisher;
 import 
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
 import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
@@ -81,7 +91,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Constructor;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 public abstract class Factory {
 
@@ -106,6 +121,7 @@ public abstract class Factory {
        private static Map<MonitorMode, JobMonitor> jobMonitorServices = new 
HashMap<>();
        private static Subscriber processLaunchSubscriber;
        private static Map<String, Session> sessionMap = new HashMap<>();
+       private static Cache<String,Session> sessionCache;
 
        public static GFacEngine getGFacEngine() throws GFacException {
                if (engine == null) {
@@ -422,6 +438,15 @@ public abstract class Factory {
                } catch (Exception e) {
                        throw new GFacException("Gfac config issue", e);
                }
+
+               sessionCache = CacheBuilder.newBuilder()
+                               
.expireAfterAccess(ServerSettings.getSessionCacheAccessTimeout(), 
TimeUnit.MINUTES)
+                               .removalListener((RemovalListener<String, 
Session>) removalNotification -> {
+                                       if 
(removalNotification.getValue().isConnected()) {
+                                               
removalNotification.getValue().disconnect();
+                                       }
+                })
+                               .build();
        }
 
        public static JobMonitor getMonitorService(MonitorMode monitorMode) 
throws AiravataException, GFacException {
@@ -479,7 +504,7 @@ public abstract class Factory {
                        throw new GFacException("Support ssh key authentication 
only");
                }
                String key = buildKey(serverInfo);
-               Session session = sessionMap.get(key);
+               Session session = sessionCache.getIfPresent(key);
                boolean valid = isValidSession(session);
                // FIXME - move following info logs to debug
                if (valid) {
@@ -514,7 +539,7 @@ public abstract class Factory {
                                        
session.setConfig("StrictHostKeyChecking", "no");
                                }
                                session.connect(); // 0 connection timeout
-                               sessionMap.put(key, session);
+                               sessionCache.put(key, session);
                        } catch (JSchException e) {
                                throw new GFacException("JSch initialization 
error ", e);
                        }
@@ -522,7 +547,7 @@ public abstract class Factory {
                        // FIXME - move following info log to debug
                        log.info("Reuse SSH session for :" + key);
                }
-               return sessionMap.get(key);
+               return session;
 
        }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/644bf896/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index 5002a6b..262f7f7 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -55,7 +55,6 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
        private static final int MAX_RETRY_COUNT = 3;
        private final SSHKeyAuthentication authentication;
        private final JSch jSch;
-       private Session session;
 
        public HPCRemoteCluster(ServerInfo serverInfo, JobManagerConfiguration 
jobManagerConfiguration, AuthenticationInfo
                        authenticationInfo) throws AiravataException, 
GFacException {
@@ -69,7 +68,6 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
                        jSch = new JSch();
                        jSch.addIdentity(UUID.randomUUID().toString(), 
authentication.getPrivateKey(), authentication.getPublicKey(),
                                        
authentication.getPassphrase().getBytes());
-                       session = Factory.getSSHSession(authenticationInfo, 
serverInfo);
                } catch (JSchException e) {
                        throw new AiravataException("JSch initialization error 
", e);
                }
@@ -120,13 +118,11 @@ public class HPCRemoteCluster extends 
AbstractRemoteCluster{
                int retry = 3;
                while (retry > 0) {
                        try {
-                               session = 
Factory.getSSHSession(authenticationInfo, serverInfo);
                                log.info("Transferring localhost:" + localFile  
+ " to " + serverInfo.getHost() + ":" + remoteFile);
-                               SSHUtils.scpTo(localFile, remoteFile, session);
+                               SSHUtils.scpTo(localFile, remoteFile,  
getSshSession());
                                retry = 0;
                        } catch (Exception e) {
                                retry--;
-                               session = 
Factory.getSSHSession(authenticationInfo, serverInfo);
                                if (retry == 0) {
                                        throw new GFacException("Failed to scp 
localhost:" + localFile + " to " + serverInfo.getHost() +
                                                        ":" + remoteFile, e);
@@ -138,18 +134,20 @@ public class HPCRemoteCluster extends 
AbstractRemoteCluster{
                }
        }
 
+       private Session getSshSession() throws GFacException {
+               return Factory.getSSHSession(authenticationInfo, serverInfo);
+       }
+
        @Override
        public void copyFrom(String remoteFile, String localFile) throws 
GFacException {
                int retry = 3;
                while(retry>0) {
                        try {
-                               session = 
Factory.getSSHSession(authenticationInfo, serverInfo);
                                log.info("Transferring " + serverInfo.getHost() 
+ ":" + remoteFile + " To localhost:" + localFile);
-                               SSHUtils.scpFrom(remoteFile, localFile, 
session);
+                               SSHUtils.scpFrom(remoteFile, localFile, 
getSession());
                                retry=0;
                        } catch (Exception e) {
                                retry--;
-                               session = 
Factory.getSSHSession(authenticationInfo, serverInfo);
                                if (retry == 0) {
                                        throw new GFacException("Failed to scp 
" + serverInfo.getHost() + ":" + remoteFile + " to " +
                                                        "localhost:" + 
localFile, e);
@@ -170,13 +168,12 @@ public class HPCRemoteCluster extends 
AbstractRemoteCluster{
                try {
                        while (retryCount < MAX_RETRY_COUNT) {
                                retryCount++;
-                               session = 
Factory.getSSHSession(authenticationInfo, serverInfo);
                                log.info("Transferring from:" + sourceFile + " 
To: " + destinationFile);
                                try {
                                        if (direction == DIRECTION.FROM) {
-                        SSHUtils.scpThirdParty(sourceFile, session, 
destinationFile, clientSession, ignoreEmptyFile);
+                        SSHUtils.scpThirdParty(sourceFile, getSession(), 
destinationFile, clientSession, ignoreEmptyFile);
                     } else {
-                        SSHUtils.scpThirdParty(sourceFile, clientSession, 
destinationFile, session, ignoreEmptyFile);
+                        SSHUtils.scpThirdParty(sourceFile, clientSession, 
destinationFile, getSession(), ignoreEmptyFile);
                     }
                                        break; // exit while loop
                                } catch (JSchException e) {
@@ -200,10 +197,9 @@ public class HPCRemoteCluster extends 
AbstractRemoteCluster{
                try {
                        while (retryCount < MAX_RETRY_COUNT) {
                                retryCount++;
-                               session = 
Factory.getSSHSession(authenticationInfo, serverInfo);
                                log.info("Creating directory: " + 
serverInfo.getHost() + ":" + directoryPath);
                                try {
-                                       SSHUtils.makeDirectory(directoryPath, 
session);
+                                       SSHUtils.makeDirectory(directoryPath, 
getSession());
                                        break;  // Exit while loop
                                } catch (JSchException e) {
                                        if (retryCount == MAX_RETRY_COUNT) {
@@ -260,9 +256,8 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
        @Override
        public List<String> listDirectory(String directoryPath) throws 
GFacException {
                try {
-                       session = Factory.getSSHSession(authenticationInfo, 
serverInfo);
                        log.info("Creating directory: " + serverInfo.getHost() 
+ ":" + directoryPath);
-                       return SSHUtils.listDirectory(directoryPath, session);
+                       return SSHUtils.listDirectory(directoryPath, 
getSession());
                } catch (JSchException | IOException e) {
                        throw new GFacException("Failed to list directory " + 
serverInfo.getHost() + ":" + directoryPath, e);
                }
@@ -277,7 +272,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 
        @Override
        public Session getSession() throws GFacException {
-               return Factory.getSSHSession(authenticationInfo, serverInfo);
+               return getSshSession();
        }
 
        @Override
@@ -314,7 +309,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
                        while (retryCount < MAX_RETRY_COUNT) {
                                retryCount++;
                                try {
-                                       session = 
Factory.getSSHSession(authenticationInfo, serverInfo);
+                                       Session session = getSshSession();
                                        channelExec = ((ChannelExec) 
session.openChannel("exec"));
                                        channelExec.setCommand(command);
                                        channelExec.setInputStream(null);

http://git-wip-us.apache.org/repos/asf/airavata/blob/644bf896/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
----------------------------------------------------------------------
diff --git 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
index cc357a0..5c8a65f 100644
--- 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
+++ 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
@@ -26,10 +26,8 @@ import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Consumer;
 import org.apache.airavata.common.exception.AiravataException;
 
-import javax.annotation.Nonnull;
 import java.util.List;
 import java.util.function.BiFunction;
-import java.util.function.Function;
 
 /**
  * This is the basic consumer

Reply via email to