Fixed AIRAVATA-2284, using guave caching with time based eviction
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/644bf896 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/644bf896 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/644bf896 Branch: refs/heads/develop Commit: 644bf89629ac61ab35917bafa737ecbcbe547463 Parents: c50e3ec Author: Shameera Rathnayaka <[email protected]> Authored: Fri Dec 30 15:50:38 2016 -0500 Committer: Shameera Rathnayaka <[email protected]> Committed: Fri Dec 30 15:50:38 2016 -0500 ---------------------------------------------------------------------- .../airavata/common/utils/ServerSettings.java | 8 ++++ .../main/resources/airavata-server.properties | 2 + .../org/apache/airavata/gfac/impl/Factory.java | 39 ++++++++++++++++---- .../airavata/gfac/impl/HPCRemoteCluster.java | 29 ++++++--------- .../airavata/messaging/core/Subscriber.java | 2 - 5 files changed, 54 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/644bf896/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java index cdf49e0..7ab807e 100644 --- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java +++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java @@ -127,6 +127,9 @@ public class ServerSettings extends ApplicationSettings { public static final String USER_PROFILE_SERVER_HOST = "user.profile.server.host"; public static final String USER_PROFILE_SERVER_PORT = "user.profile.server.port"; + /* Caching */ + private static final String SESSION_CACHE_ACCESS_TIME_OUT = "ssh.session.cache.access.timeout"; + // todo until AIRAVATA-2066 is finished, keep server side list configurations here. private static Map<String, String[]> listConfigurations = new HashMap<>(); @@ -470,4 +473,9 @@ public class ServerSettings extends ApplicationSettings { public static int getAuroraSchedulerTimeout() throws ApplicationSettingsException { return Integer.valueOf(getSetting(AURORA_SCHEDULER_CONNECT_TIMEOUT_MS)); } + + public static int getSessionCacheAccessTimeout() { + return Integer.valueOf(getSetting(SESSION_CACHE_ACCESS_TIME_OUT, "30")); + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/644bf896/modules/configuration/server/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties index ffc9ea8..defc1cc 100644 --- a/modules/configuration/server/src/main/resources/airavata-server.properties +++ b/modules/configuration/server/src/main/resources/airavata-server.properties @@ -170,6 +170,8 @@ gfac.server.port=8950 gfac.thread.pool.size=50 host.scheduler=org.apache.airavata.gfac.impl.DefaultHostScheduler +# ssh session access timeout in minutes default is 30 minutes +#ssh.session.cache.access.timeout=30 ########################################################################### # Registry Server Configurations http://git-wip-us.apache.org/repos/asf/airavata/blob/644bf896/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java index bf06086..fda5386 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java @@ -20,6 +20,9 @@ */ package org.apache.airavata.gfac.impl; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; import com.jcraft.jsch.Channel; import com.jcraft.jsch.ChannelExec; import com.jcraft.jsch.JSch; @@ -33,7 +36,6 @@ import org.apache.airavata.credential.store.credential.Credential; import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential; import org.apache.airavata.credential.store.store.CredentialReader; import org.apache.airavata.credential.store.store.CredentialStoreException; -import org.apache.airavata.gfac.core.GFac; import org.apache.airavata.gfac.core.GFacEngine; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.GFacUtils; @@ -55,13 +57,21 @@ import org.apache.airavata.gfac.core.task.JobSubmissionTask; import org.apache.airavata.gfac.core.task.Task; import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher; import org.apache.airavata.gfac.core.watcher.RedeliveryRequestWatcher; -import org.apache.airavata.gfac.impl.job.*; +import org.apache.airavata.gfac.impl.job.ForkJobConfiguration; +import org.apache.airavata.gfac.impl.job.LSFJobConfiguration; +import org.apache.airavata.gfac.impl.job.PBSJobConfiguration; +import org.apache.airavata.gfac.impl.job.SlurmJobConfiguration; +import org.apache.airavata.gfac.impl.job.UGEJobConfiguration; import org.apache.airavata.gfac.impl.task.ArchiveTask; import org.apache.airavata.gfac.impl.watcher.CancelRequestWatcherImpl; import org.apache.airavata.gfac.impl.watcher.RedeliveryRequestWatcherImpl; import org.apache.airavata.gfac.monitor.cloud.AuroraJobMonitor; import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor; -import org.apache.airavata.messaging.core.*; +import org.apache.airavata.messaging.core.MessageHandler; +import org.apache.airavata.messaging.core.MessagingFactory; +import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.messaging.core.Subscriber; +import org.apache.airavata.messaging.core.Type; import org.apache.airavata.messaging.core.impl.RabbitMQPublisher; import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; import org.apache.airavata.model.appcatalog.computeresource.MonitorMode; @@ -81,7 +91,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Constructor; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; public abstract class Factory { @@ -106,6 +121,7 @@ public abstract class Factory { private static Map<MonitorMode, JobMonitor> jobMonitorServices = new HashMap<>(); private static Subscriber processLaunchSubscriber; private static Map<String, Session> sessionMap = new HashMap<>(); + private static Cache<String,Session> sessionCache; public static GFacEngine getGFacEngine() throws GFacException { if (engine == null) { @@ -422,6 +438,15 @@ public abstract class Factory { } catch (Exception e) { throw new GFacException("Gfac config issue", e); } + + sessionCache = CacheBuilder.newBuilder() + .expireAfterAccess(ServerSettings.getSessionCacheAccessTimeout(), TimeUnit.MINUTES) + .removalListener((RemovalListener<String, Session>) removalNotification -> { + if (removalNotification.getValue().isConnected()) { + removalNotification.getValue().disconnect(); + } + }) + .build(); } public static JobMonitor getMonitorService(MonitorMode monitorMode) throws AiravataException, GFacException { @@ -479,7 +504,7 @@ public abstract class Factory { throw new GFacException("Support ssh key authentication only"); } String key = buildKey(serverInfo); - Session session = sessionMap.get(key); + Session session = sessionCache.getIfPresent(key); boolean valid = isValidSession(session); // FIXME - move following info logs to debug if (valid) { @@ -514,7 +539,7 @@ public abstract class Factory { session.setConfig("StrictHostKeyChecking", "no"); } session.connect(); // 0 connection timeout - sessionMap.put(key, session); + sessionCache.put(key, session); } catch (JSchException e) { throw new GFacException("JSch initialization error ", e); } @@ -522,7 +547,7 @@ public abstract class Factory { // FIXME - move following info log to debug log.info("Reuse SSH session for :" + key); } - return sessionMap.get(key); + return session; } http://git-wip-us.apache.org/repos/asf/airavata/blob/644bf896/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java index 5002a6b..262f7f7 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java @@ -55,7 +55,6 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ private static final int MAX_RETRY_COUNT = 3; private final SSHKeyAuthentication authentication; private final JSch jSch; - private Session session; public HPCRemoteCluster(ServerInfo serverInfo, JobManagerConfiguration jobManagerConfiguration, AuthenticationInfo authenticationInfo) throws AiravataException, GFacException { @@ -69,7 +68,6 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ jSch = new JSch(); jSch.addIdentity(UUID.randomUUID().toString(), authentication.getPrivateKey(), authentication.getPublicKey(), authentication.getPassphrase().getBytes()); - session = Factory.getSSHSession(authenticationInfo, serverInfo); } catch (JSchException e) { throw new AiravataException("JSch initialization error ", e); } @@ -120,13 +118,11 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ int retry = 3; while (retry > 0) { try { - session = Factory.getSSHSession(authenticationInfo, serverInfo); log.info("Transferring localhost:" + localFile + " to " + serverInfo.getHost() + ":" + remoteFile); - SSHUtils.scpTo(localFile, remoteFile, session); + SSHUtils.scpTo(localFile, remoteFile, getSshSession()); retry = 0; } catch (Exception e) { retry--; - session = Factory.getSSHSession(authenticationInfo, serverInfo); if (retry == 0) { throw new GFacException("Failed to scp localhost:" + localFile + " to " + serverInfo.getHost() + ":" + remoteFile, e); @@ -138,18 +134,20 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ } } + private Session getSshSession() throws GFacException { + return Factory.getSSHSession(authenticationInfo, serverInfo); + } + @Override public void copyFrom(String remoteFile, String localFile) throws GFacException { int retry = 3; while(retry>0) { try { - session = Factory.getSSHSession(authenticationInfo, serverInfo); log.info("Transferring " + serverInfo.getHost() + ":" + remoteFile + " To localhost:" + localFile); - SSHUtils.scpFrom(remoteFile, localFile, session); + SSHUtils.scpFrom(remoteFile, localFile, getSession()); retry=0; } catch (Exception e) { retry--; - session = Factory.getSSHSession(authenticationInfo, serverInfo); if (retry == 0) { throw new GFacException("Failed to scp " + serverInfo.getHost() + ":" + remoteFile + " to " + "localhost:" + localFile, e); @@ -170,13 +168,12 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ try { while (retryCount < MAX_RETRY_COUNT) { retryCount++; - session = Factory.getSSHSession(authenticationInfo, serverInfo); log.info("Transferring from:" + sourceFile + " To: " + destinationFile); try { if (direction == DIRECTION.FROM) { - SSHUtils.scpThirdParty(sourceFile, session, destinationFile, clientSession, ignoreEmptyFile); + SSHUtils.scpThirdParty(sourceFile, getSession(), destinationFile, clientSession, ignoreEmptyFile); } else { - SSHUtils.scpThirdParty(sourceFile, clientSession, destinationFile, session, ignoreEmptyFile); + SSHUtils.scpThirdParty(sourceFile, clientSession, destinationFile, getSession(), ignoreEmptyFile); } break; // exit while loop } catch (JSchException e) { @@ -200,10 +197,9 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ try { while (retryCount < MAX_RETRY_COUNT) { retryCount++; - session = Factory.getSSHSession(authenticationInfo, serverInfo); log.info("Creating directory: " + serverInfo.getHost() + ":" + directoryPath); try { - SSHUtils.makeDirectory(directoryPath, session); + SSHUtils.makeDirectory(directoryPath, getSession()); break; // Exit while loop } catch (JSchException e) { if (retryCount == MAX_RETRY_COUNT) { @@ -260,9 +256,8 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ @Override public List<String> listDirectory(String directoryPath) throws GFacException { try { - session = Factory.getSSHSession(authenticationInfo, serverInfo); log.info("Creating directory: " + serverInfo.getHost() + ":" + directoryPath); - return SSHUtils.listDirectory(directoryPath, session); + return SSHUtils.listDirectory(directoryPath, getSession()); } catch (JSchException | IOException e) { throw new GFacException("Failed to list directory " + serverInfo.getHost() + ":" + directoryPath, e); } @@ -277,7 +272,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ @Override public Session getSession() throws GFacException { - return Factory.getSSHSession(authenticationInfo, serverInfo); + return getSshSession(); } @Override @@ -314,7 +309,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ while (retryCount < MAX_RETRY_COUNT) { retryCount++; try { - session = Factory.getSSHSession(authenticationInfo, serverInfo); + Session session = getSshSession(); channelExec = ((ChannelExec) session.openChannel("exec")); channelExec.setCommand(command); channelExec.setInputStream(null); http://git-wip-us.apache.org/repos/asf/airavata/blob/644bf896/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java index cc357a0..5c8a65f 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java @@ -26,10 +26,8 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import org.apache.airavata.common.exception.AiravataException; -import javax.annotation.Nonnull; import java.util.List; import java.util.function.BiFunction; -import java.util.function.Function; /** * This is the basic consumer
