This is an automated email from the ASF dual-hosted git repository.

smolnar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git


The following commit(s) were added to refs/heads/master by this push:
     new 66012400d KNOX-2788 - Implementing EmptyVerifier and Cleaning 
background thread (#620)
66012400d is described below

commit 66012400d4bf1e850e80d17709b8ab6a07f91a07
Author: MrtnBalazs <[email protected]>
AuthorDate: Mon Aug 22 11:38:18 2022 +0200

    KNOX-2788 - Implementing EmptyVerifier and Cleaning background thread (#620)
---
 .../gateway/config/impl/GatewayConfigImpl.java     |   7 +
 .../factory/ConcurrentSessionVerifierFactory.java  |  32 ++-
 .../control/EmptyConcurrentSessionVerifier.java}   |  37 ++--
 .../control/InMemoryConcurrentSessionVerifier.java |  39 +++-
 .../gateway/config/impl/GatewayConfigImplTest.java |  16 ++
 .../ConcurrentSessionVerifierFactoryTest.java      |  91 ++++++++
 .../InMemoryConcurrentSessionVerifierTest.java     | 242 ++++++++++++++++-----
 .../gateway/service/knoxsso/WebSSOResource.java    |   2 +-
 .../service/knoxsso/WebSSOResourceTest.java        |  32 ++-
 .../gateway/service/knoxsso/WebSSOutResource.java  |   4 +-
 .../org/apache/knox/gateway/GatewayTestConfig.java |   5 +
 .../apache/knox/gateway/config/GatewayConfig.java  |   2 +
 .../apache/knox/gateway/services/ServiceType.java  |   2 +-
 13 files changed, 415 insertions(+), 96 deletions(-)

diff --git 
a/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
 
b/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
index f2e8cd485..e96699882 100644
--- 
a/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
+++ 
b/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
@@ -303,6 +303,8 @@ public class GatewayConfigImpl extends Configuration 
implements GatewayConfig {
   private static final int 
GATEWAY_NON_PRIVILEGED_USERS_CONCURRENT_SESSION_LIMIT_DEFAULT = 2;
   private static final String GATEWAY_PRIVILEGED_USERS = 
GATEWAY_CONFIG_FILE_PREFIX + "." + PRIVILEGED_USERS;
   private static final String GATEWAY_NON_PRIVILEGED_USERS = 
GATEWAY_CONFIG_FILE_PREFIX + "." + NON_PRIVILEGED_USERS;
+  private static final String 
GATEWAY_SESSION_VERIFICATION_EXPIRED_TOKENS_CLEANING_PERIOD = 
GATEWAY_CONFIG_FILE_PREFIX + 
".session.verification.expired.tokens.cleaning.period";
+  private static final long 
GATEWAY_SESSION_VERIFICATION_EXPIRED_TOKENS_CLEANING_PERIOD_DEFAULT = 
TimeUnit.MINUTES.toSeconds(30);
 
   public GatewayConfigImpl() {
     init();
@@ -1382,4 +1384,9 @@ public class GatewayConfigImpl extends Configuration 
implements GatewayConfig {
     final Collection<String> nonPrivilegedUsers = 
getTrimmedStringCollection(GATEWAY_NON_PRIVILEGED_USERS);
     return nonPrivilegedUsers == null ? Collections.emptySet() : new 
HashSet<>(nonPrivilegedUsers);
   }
+
+  @Override
+  public long getConcurrentSessionVerifierExpiredTokensCleaningPeriod() {
+    return 
getLong(GATEWAY_SESSION_VERIFICATION_EXPIRED_TOKENS_CLEANING_PERIOD, 
GATEWAY_SESSION_VERIFICATION_EXPIRED_TOKENS_CLEANING_PERIOD_DEFAULT);
+  }
 }
diff --git 
a/gateway-server/src/main/java/org/apache/knox/gateway/services/factory/ConcurrentSessionVerifierFactory.java
 
b/gateway-server/src/main/java/org/apache/knox/gateway/services/factory/ConcurrentSessionVerifierFactory.java
index 0e84dc2ff..e3c2fc372 100644
--- 
a/gateway-server/src/main/java/org/apache/knox/gateway/services/factory/ConcurrentSessionVerifierFactory.java
+++ 
b/gateway-server/src/main/java/org/apache/knox/gateway/services/factory/ConcurrentSessionVerifierFactory.java
@@ -17,21 +17,37 @@
  */
 package org.apache.knox.gateway.services.factory;
 
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableList;
+
+import java.util.Collection;
+import java.util.Map;
+
 import org.apache.knox.gateway.config.GatewayConfig;
 import org.apache.knox.gateway.services.GatewayServices;
 import org.apache.knox.gateway.services.Service;
 import org.apache.knox.gateway.services.ServiceLifecycleException;
 import org.apache.knox.gateway.services.ServiceType;
+import org.apache.knox.gateway.session.control.EmptyConcurrentSessionVerifier;
 import 
org.apache.knox.gateway.session.control.InMemoryConcurrentSessionVerifier;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-
 public class ConcurrentSessionVerifierFactory extends AbstractServiceFactory {
   @Override
   protected Service createService(GatewayServices gatewayServices, ServiceType 
serviceType, GatewayConfig gatewayConfig, Map<String, String> options, String 
implementation) throws ServiceLifecycleException {
-    return shouldCreateService(implementation) ? new 
InMemoryConcurrentSessionVerifier() : null;
+    Service service = null;
+    if (shouldCreateService(implementation)) {
+      if (matchesImplementation(implementation, 
EmptyConcurrentSessionVerifier.class, true)) {
+        service = new EmptyConcurrentSessionVerifier();
+      } else if (matchesImplementation(implementation, 
InMemoryConcurrentSessionVerifier.class)) {
+        if (isThereGroupConfigured(gatewayConfig)) {
+          service = new InMemoryConcurrentSessionVerifier();
+        } else {
+          throw new ServiceLifecycleException("Error creating 
InMemoryConcurrentSessionVerifier, at least one user should be added in either 
the privileged group or the non-privileged group!");
+        }
+      }
+      logServiceUsage(implementation, serviceType);
+    }
+    return service;
   }
 
   @Override
@@ -41,6 +57,10 @@ public class ConcurrentSessionVerifierFactory extends 
AbstractServiceFactory {
 
   @Override
   protected Collection<String> getKnownImplementations() {
-    return 
Collections.singleton(InMemoryConcurrentSessionVerifier.class.getName());
+    return 
unmodifiableList(asList(InMemoryConcurrentSessionVerifier.class.getName(), 
EmptyConcurrentSessionVerifier.class.getName()));
+  }
+
+  private boolean isThereGroupConfigured(GatewayConfig gatewayConfig) {
+    return !gatewayConfig.getNonPrivilegedUsers().isEmpty() || 
!gatewayConfig.getPrivilegedUsers().isEmpty();
   }
 }
diff --git 
a/gateway-server/src/main/java/org/apache/knox/gateway/services/factory/ConcurrentSessionVerifierFactory.java
 
b/gateway-server/src/main/java/org/apache/knox/gateway/session/control/EmptyConcurrentSessionVerifier.java
similarity index 50%
copy from 
gateway-server/src/main/java/org/apache/knox/gateway/services/factory/ConcurrentSessionVerifierFactory.java
copy to 
gateway-server/src/main/java/org/apache/knox/gateway/session/control/EmptyConcurrentSessionVerifier.java
index 0e84dc2ff..856e18670 100644
--- 
a/gateway-server/src/main/java/org/apache/knox/gateway/services/factory/ConcurrentSessionVerifierFactory.java
+++ 
b/gateway-server/src/main/java/org/apache/knox/gateway/session/control/EmptyConcurrentSessionVerifier.java
@@ -15,32 +15,37 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.knox.gateway.services.factory;
+package org.apache.knox.gateway.session.control;
+
+import java.util.Map;
 
 import org.apache.knox.gateway.config.GatewayConfig;
-import org.apache.knox.gateway.services.GatewayServices;
-import org.apache.knox.gateway.services.Service;
 import org.apache.knox.gateway.services.ServiceLifecycleException;
-import org.apache.knox.gateway.services.ServiceType;
-import 
org.apache.knox.gateway.session.control.InMemoryConcurrentSessionVerifier;
+import org.apache.knox.gateway.services.security.token.impl.JWT;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
+public class EmptyConcurrentSessionVerifier implements 
ConcurrentSessionVerifier {
+  @Override
+  public void init(GatewayConfig config, Map<String, String> options) throws 
ServiceLifecycleException {
+
+  }
 
-public class ConcurrentSessionVerifierFactory extends AbstractServiceFactory {
   @Override
-  protected Service createService(GatewayServices gatewayServices, ServiceType 
serviceType, GatewayConfig gatewayConfig, Map<String, String> options, String 
implementation) throws ServiceLifecycleException {
-    return shouldCreateService(implementation) ? new 
InMemoryConcurrentSessionVerifier() : null;
+  public void start() throws ServiceLifecycleException {
+
+  }
+
+  @Override
+  public void stop() throws ServiceLifecycleException {
+
   }
 
   @Override
-  protected ServiceType getServiceType() {
-    return ServiceType.CONCURRENT_SESSION_VERIFIER;
+  public boolean verifySessionForUser(String username, JWT JWToken) {
+    return true;
   }
 
   @Override
-  protected Collection<String> getKnownImplementations() {
-    return 
Collections.singleton(InMemoryConcurrentSessionVerifier.class.getName());
+  public void sessionEndedForUser(String username, String token) {
+
   }
-}
+}
\ No newline at end of file
diff --git 
a/gateway-server/src/main/java/org/apache/knox/gateway/session/control/InMemoryConcurrentSessionVerifier.java
 
b/gateway-server/src/main/java/org/apache/knox/gateway/session/control/InMemoryConcurrentSessionVerifier.java
index bf0ec4df4..d033035ed 100644
--- 
a/gateway-server/src/main/java/org/apache/knox/gateway/session/control/InMemoryConcurrentSessionVerifier.java
+++ 
b/gateway-server/src/main/java/org/apache/knox/gateway/session/control/InMemoryConcurrentSessionVerifier.java
@@ -18,23 +18,27 @@
 package org.apache.knox.gateway.session.control;
 
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.knox.gateway.GatewayMessages;
-import org.apache.knox.gateway.config.GatewayConfig;
-import org.apache.knox.gateway.i18n.messages.MessagesFactory;
-import org.apache.knox.gateway.services.ServiceLifecycleException;
-import org.apache.knox.gateway.services.security.token.impl.JWT;
-
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.knox.gateway.GatewayMessages;
+import org.apache.knox.gateway.config.GatewayConfig;
+import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+import org.apache.knox.gateway.services.ServiceLifecycleException;
+import org.apache.knox.gateway.services.security.token.impl.JWT;
+
 public class InMemoryConcurrentSessionVerifier implements 
ConcurrentSessionVerifier {
   private static final GatewayMessages LOG = 
MessagesFactory.get(GatewayMessages.class);
 
@@ -44,6 +48,8 @@ public class InMemoryConcurrentSessionVerifier implements 
ConcurrentSessionVerif
   private int nonPrivilegedUserConcurrentSessionLimit;
   private Map<String, Set<SessionJWT>> concurrentSessionCounter;
   private final Lock sessionCountModifyLock = new ReentrantLock();
+  private long cleaningPeriod;
+  private final ScheduledExecutorService expiredTokenRemover = 
Executors.newSingleThreadScheduledExecutor();
 
   @Override
   public boolean verifySessionForUser(String username, JWT jwtToken) {
@@ -122,17 +128,34 @@ public class InMemoryConcurrentSessionVerifier implements 
ConcurrentSessionVerif
     this.nonPrivilegedUsers = config.getNonPrivilegedUsers();
     this.privilegedUserConcurrentSessionLimit = 
config.getPrivilegedUsersConcurrentSessionLimit();
     this.nonPrivilegedUserConcurrentSessionLimit = 
config.getNonPrivilegedUsersConcurrentSessionLimit();
+    this.cleaningPeriod = 
config.getConcurrentSessionVerifierExpiredTokensCleaningPeriod();
     this.concurrentSessionCounter = new ConcurrentHashMap<>();
   }
 
   @Override
   public void start() throws ServiceLifecycleException {
-
+    expiredTokenRemover.scheduleAtFixedRate(this::removeExpiredTokens, 
cleaningPeriod, cleaningPeriod, TimeUnit.SECONDS);
   }
 
   @Override
   public void stop() throws ServiceLifecycleException {
+    expiredTokenRemover.shutdown();
+  }
 
+  private void removeExpiredTokens() {
+    sessionCountModifyLock.lock();
+    try {
+      Iterator<Map.Entry<String, Set<SessionJWT>>> 
concurrentSessionCounterIterator = 
concurrentSessionCounter.entrySet().iterator();
+      while (concurrentSessionCounterIterator.hasNext()) {
+        Set<SessionJWT> sessionJWTSet = 
concurrentSessionCounterIterator.next().getValue();
+        sessionJWTSet.removeIf(session -> session.hasExpired());
+        if (sessionJWTSet.isEmpty()) {
+          concurrentSessionCounterIterator.remove();
+        }
+      }
+    } finally {
+      sessionCountModifyLock.unlock();
+    }
   }
 
   public static class SessionJWT {
diff --git 
a/gateway-server/src/test/java/org/apache/knox/gateway/config/impl/GatewayConfigImplTest.java
 
b/gateway-server/src/test/java/org/apache/knox/gateway/config/impl/GatewayConfigImplTest.java
index e12b49ddf..e84b99b95 100644
--- 
a/gateway-server/src/test/java/org/apache/knox/gateway/config/impl/GatewayConfigImplTest.java
+++ 
b/gateway-server/src/test/java/org/apache/knox/gateway/config/impl/GatewayConfigImplTest.java
@@ -498,4 +498,20 @@ public class GatewayConfigImplTest {
     hosts.add("127.0.0.1");
     assertThat(config.getGatewayHost(), is(hosts));
   }
+
+  @Test
+  public void 
testDefaultConcurrentSessionVerifierExpiredTokensCleaningPeriodParameter() {
+    GatewayConfigImpl config = new GatewayConfigImpl();
+
+    
assertThat(config.getConcurrentSessionVerifierExpiredTokensCleaningPeriod(), 
is(TimeUnit.MINUTES.toSeconds(30)));
+  }
+
+  @Test
+  public void 
testConcurrentSessionVerifierExpiredTokensCleaningPeriodParameter() {
+    GatewayConfigImpl config = new GatewayConfigImpl();
+
+    config.set("gateway.session.verification.expired.tokens.cleaning.period", 
"1000");
+    
assertThat(config.getConcurrentSessionVerifierExpiredTokensCleaningPeriod(), 
is(1000L));
+  }
+
 }
diff --git 
a/gateway-server/src/test/java/org/apache/knox/gateway/services/factory/ConcurrentSessionVerifierFactoryTest.java
 
b/gateway-server/src/test/java/org/apache/knox/gateway/services/factory/ConcurrentSessionVerifierFactoryTest.java
new file mode 100644
index 000000000..66daaa208
--- /dev/null
+++ 
b/gateway-server/src/test/java/org/apache/knox/gateway/services/factory/ConcurrentSessionVerifierFactoryTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.services.factory;
+
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.knox.gateway.config.GatewayConfig;
+import org.apache.knox.gateway.services.ServiceLifecycleException;
+import org.apache.knox.gateway.services.ServiceType;
+import org.apache.knox.gateway.session.control.ConcurrentSessionVerifier;
+import org.apache.knox.gateway.session.control.EmptyConcurrentSessionVerifier;
+import 
org.apache.knox.gateway.session.control.InMemoryConcurrentSessionVerifier;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ConcurrentSessionVerifierFactoryTest extends ServiceFactoryTest {
+  private final ConcurrentSessionVerifierFactory serviceFactory = new 
ConcurrentSessionVerifierFactory();
+
+  @Before
+  public void setUp() throws Exception {
+    initConfig();
+  }
+
+  @Test
+  public void testBasics() throws Exception {
+    super.testBasics(serviceFactory, ServiceType.MASTER_SERVICE, 
ServiceType.CONCURRENT_SESSION_VERIFIER);
+  }
+
+  @Test
+  public void testShouldReturnEmptyConcurrentSessionVerifier() throws 
Exception {
+    GatewayConfig configForVerifier = mockConfig(Collections.emptySet(), 
Collections.emptySet());
+
+    ConcurrentSessionVerifier concurrentSessionVerifier = 
(ConcurrentSessionVerifier) serviceFactory.create(gatewayServices, 
ServiceType.CONCURRENT_SESSION_VERIFIER, configForVerifier, null, "");
+    assertTrue(concurrentSessionVerifier instanceof 
EmptyConcurrentSessionVerifier);
+    concurrentSessionVerifier = (ConcurrentSessionVerifier) 
serviceFactory.create(gatewayServices, ServiceType.CONCURRENT_SESSION_VERIFIER, 
configForVerifier, null, EmptyConcurrentSessionVerifier.class.getName());
+    assertTrue(concurrentSessionVerifier instanceof 
EmptyConcurrentSessionVerifier);
+  }
+
+  @Test
+  public void testShouldReturnInMemoryConcurrentSessionVerifier() throws 
Exception {
+    GatewayConfig configForVerifier = mockConfig(new 
HashSet<>(Arrays.asList("admin")), Collections.emptySet());
+
+    ConcurrentSessionVerifier concurrentSessionVerifier = 
(ConcurrentSessionVerifier) serviceFactory.create(gatewayServices, 
ServiceType.CONCURRENT_SESSION_VERIFIER, configForVerifier, null, 
InMemoryConcurrentSessionVerifier.class.getName());
+    assertTrue(concurrentSessionVerifier instanceof 
InMemoryConcurrentSessionVerifier);
+
+    configForVerifier = mockConfig(Collections.emptySet(), new 
HashSet<>(Arrays.asList("tom")));
+
+    concurrentSessionVerifier = (ConcurrentSessionVerifier) 
serviceFactory.create(gatewayServices, ServiceType.CONCURRENT_SESSION_VERIFIER, 
configForVerifier, null, InMemoryConcurrentSessionVerifier.class.getName());
+    assertTrue(concurrentSessionVerifier instanceof 
InMemoryConcurrentSessionVerifier);
+  }
+
+  @Test
+  public void testShouldThrowException() {
+    GatewayConfig configForVerifier = mockConfig(Collections.emptySet(), 
Collections.emptySet());
+
+    assertThrows(ServiceLifecycleException.class, () -> {
+      serviceFactory.create(gatewayServices, 
ServiceType.CONCURRENT_SESSION_VERIFIER, configForVerifier, null, 
InMemoryConcurrentSessionVerifier.class.getName());
+    });
+  }
+
+  private GatewayConfig mockConfig(Set<String> privilegedUsers, Set<String> 
nonPrivilegedUsers) {
+    GatewayConfig config = EasyMock.createNiceMock(GatewayConfig.class);
+    
EasyMock.expect(config.getPrivilegedUsers()).andReturn(privilegedUsers).anyTimes();
+    
EasyMock.expect(config.getNonPrivilegedUsers()).andReturn(nonPrivilegedUsers).anyTimes();
+    EasyMock.replay(config);
+    return config;
+  }
+
+}
diff --git 
a/gateway-server/src/test/java/org/apache/knox/gateway/session/control/InMemoryConcurrentSessionVerifierTest.java
 
b/gateway-server/src/test/java/org/apache/knox/gateway/session/control/InMemoryConcurrentSessionVerifierTest.java
index 4cf1f2c75..c16128a25 100644
--- 
a/gateway-server/src/test/java/org/apache/knox/gateway/session/control/InMemoryConcurrentSessionVerifierTest.java
+++ 
b/gateway-server/src/test/java/org/apache/knox/gateway/session/control/InMemoryConcurrentSessionVerifierTest.java
@@ -17,6 +17,22 @@
  */
 package org.apache.knox.gateway.session.control;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 import org.apache.knox.gateway.config.GatewayConfig;
 import org.apache.knox.gateway.services.ServiceLifecycleException;
 import org.apache.knox.gateway.services.security.AliasService;
@@ -33,17 +49,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 public class InMemoryConcurrentSessionVerifierTest {
+  private final long DEFAULT_TEST_EXPIRY_PERIOD = 1000;
   private InMemoryConcurrentSessionVerifier verifier;
   private Map<String, String> options;
   private DefaultTokenAuthorityService tokenAuthority;
@@ -101,26 +108,9 @@ public class InMemoryConcurrentSessionVerifierTest {
     tokenAuthority.init(config, new HashMap<>());
     tokenAuthority.start();
 
-    jwtAttributesForAdmin = new JWTokenAttributesBuilder()
-            .setIssuer("KNOXSSO")
-            .setUserName("admin")
-            .setAudiences(new ArrayList<>())
-            .setAlgorithm("RS256")
-            .setExpires(-1)
-            .setSigningKeystoreName(null)
-            .setSigningKeystoreAlias(null)
-            .setSigningKeystorePassphrase(null)
-            .build();
-    jwtAttributesForTom = new JWTokenAttributesBuilder()
-            .setIssuer("KNOXSSO")
-            .setUserName("tom")
-            .setAudiences(new ArrayList<>())
-            .setAlgorithm("RS256")
-            .setExpires(-1)
-            .setSigningKeystoreName(null)
-            .setSigningKeystoreAlias(null)
-            .setSigningKeystorePassphrase(null)
-            .build();
+    jwtAttributesForAdmin = makeJwtAttribute("admin", false);
+    jwtAttributesForTom = makeJwtAttribute("tom", false);
+
     try {
       adminToken1 = tokenAuthority.issueToken(jwtAttributesForAdmin);
       adminToken2 = tokenAuthority.issueToken(jwtAttributesForAdmin);
@@ -148,6 +138,20 @@ public class InMemoryConcurrentSessionVerifierTest {
     return config;
   }
 
+  private JWTokenAttributes makeJwtAttribute(String username, boolean 
expiring) {
+    long expiryTime = expiring ? System.currentTimeMillis() + 
DEFAULT_TEST_EXPIRY_PERIOD : -1;
+    return new JWTokenAttributesBuilder()
+            .setIssuer("KNOXSSO")
+            .setUserName(username)
+            .setAudiences(new ArrayList<>())
+            .setAlgorithm("RS256")
+            .setExpires(expiryTime)
+            .setSigningKeystoreName(null)
+            .setSigningKeystoreAlias(null)
+            .setSigningKeystorePassphrase(null)
+            .build();
+  }
+
   /**
    * The goal for this test is to prove that if the user is not configured for 
either of the groups then
    * neither of the limits apply to him, he can have unlimited sessions.
@@ -272,16 +276,7 @@ public class InMemoryConcurrentSessionVerifierTest {
     GatewayConfig config = mockConfig(new HashSet<>(Arrays.asList("admin")), 
new HashSet<>(Arrays.asList("tom", "guest")), 3, 3);
     verifier.init(config, options);
 
-    JWTokenAttributes expiringJwtAttributesForTom = new 
JWTokenAttributesBuilder()
-            .setIssuer("KNOXSSO")
-            .setUserName("tom")
-            .setAudiences(new ArrayList<>())
-            .setAlgorithm("RS256")
-            .setExpires(System.currentTimeMillis() + 1000)
-            .setSigningKeystoreName(null)
-            .setSigningKeystoreAlias(null)
-            .setSigningKeystorePassphrase(null)
-            .build();
+    JWTokenAttributes expiringJwtAttributesForTom = makeJwtAttribute("tom", 
true);
 
     JWT tomToken = tokenAuthority.issueToken(jwtAttributesForTom);
     verifier.verifySessionForUser("tom", tomToken);
@@ -295,16 +290,7 @@ public class InMemoryConcurrentSessionVerifierTest {
     Thread.sleep(1000L);
     Assert.assertEquals(1, verifier.countValidTokensForUser("tom"));
 
-    JWTokenAttributes expiringJwtAttributesForAdmin = new 
JWTokenAttributesBuilder()
-            .setIssuer("KNOXSSO")
-            .setUserName("admin")
-            .setAudiences(new ArrayList<>())
-            .setAlgorithm("RS256")
-            .setExpires(System.currentTimeMillis() + 1000)
-            .setSigningKeystoreName(null)
-            .setSigningKeystoreAlias(null)
-            .setSigningKeystorePassphrase(null)
-            .build();
+    JWTokenAttributes expiringJwtAttributesForAdmin = 
makeJwtAttribute("admin", true);
 
     JWT adminToken = tokenAuthority.issueToken(jwtAttributesForAdmin);
     verifier.verifySessionForUser("admin", adminToken);
@@ -318,6 +304,162 @@ public class InMemoryConcurrentSessionVerifierTest {
     Thread.sleep(1000L);
     Assert.assertEquals(1, verifier.countValidTokensForUser("admin"));
   }
+
+  @Test
+  public void testBackgroundThreadRemoveExpiredTokens() throws 
ServiceLifecycleException, TokenServiceException, InterruptedException {
+    GatewayConfig config = mockConfig(new HashSet<>(Arrays.asList("admin")), 
new HashSet<>(Arrays.asList("tom", "guest")), 3, 3);
+    verifier.init(config, options);
+
+    JWTokenAttributes expiringJwtAttributesForAdmin = 
makeJwtAttribute("admin", true);
+
+    verifier.verifySessionForUser("admin", adminToken1);
+    verifier.verifySessionForUser("admin", adminToken2);
+    JWT expiringAdminToken = 
tokenAuthority.issueToken(expiringJwtAttributesForAdmin);
+    verifier.verifySessionForUser("admin", expiringAdminToken);
+    Assert.assertEquals(3, verifier.countValidTokensForUser("admin"));
+    Thread.sleep(1100);
+    Assert.assertEquals(2, verifier.countValidTokensForUser("admin"));
+
+    JWTokenAttributes expiringJwtAttributesForTom = makeJwtAttribute("tom", 
true);
+
+    verifier.verifySessionForUser("tom", tomToken1);
+    verifier.verifySessionForUser("tom", tomToken2);
+    JWT expiringTomToken = 
tokenAuthority.issueToken(expiringJwtAttributesForTom);
+    verifier.verifySessionForUser("tom", expiringTomToken);
+    Assert.assertEquals(3, verifier.countValidTokensForUser("tom"));
+    Thread.sleep(1100);
+    Assert.assertEquals(2, verifier.countValidTokensForUser("tom"));
+  }
+
+  @SuppressWarnings("PMD.DoNotUseThreads")
+  @Test
+  public void testPrivilegedLoginLogoutStress() throws 
ServiceLifecycleException, InterruptedException {
+    GatewayConfig config = mockConfig(new HashSet<>(Arrays.asList("admin")), 
new HashSet<>(Arrays.asList("tom", "guest")), 256, 256);
+    verifier.init(config, options);
+
+    ExecutorService executor = Executors.newFixedThreadPool(128);
+    BlockingQueue<JWT> tokenStorage = new ArrayBlockingQueue<>(256);
+    CyclicBarrier barrier = new CyclicBarrier(128);
+
+    Runnable privilegedLogin = () -> {
+      JWT token;
+      try {
+        token = tokenAuthority.issueToken(jwtAttributesForAdmin);
+        tokenStorage.add(token);
+        barrier.await();
+      } catch (InterruptedException | BrokenBarrierException | 
TokenServiceException e) {
+        throw new RuntimeException(e);
+      }
+      verifier.verifySessionForUser("admin", token);
+    };
+
+    for (int i = 0; i < 128; i++) {
+      executor.submit(privilegedLogin);
+    }
+    Thread.sleep(1000L);
+    Assert.assertEquals(128, verifier.countValidTokensForUser("admin"));
+
+    Runnable privilegedLogout = () -> {
+      JWT token;
+      try {
+        token = tokenStorage.take();
+        barrier.await();
+      } catch (InterruptedException | BrokenBarrierException e) {
+        throw new RuntimeException(e);
+      }
+      verifier.sessionEndedForUser("admin", String.valueOf(token));
+    };
+
+    for (int i = 0; i < 64; i++) {
+      executor.submit(privilegedLogin);
+    }
+    for (int i = 0; i < 64; i++) {
+      executor.submit(privilegedLogout);
+    }
+    Thread.sleep(1000L);
+    Assert.assertEquals(128, verifier.countValidTokensForUser("admin"));
+
+    for (int i = 0; i < 128; i++) {
+      executor.submit(privilegedLogout);
+    }
+    Thread.sleep(1000L);
+    Assert.assertEquals(0, verifier.countValidTokensForUser("admin"));
+
+    config = mockConfig(new HashSet<>(Arrays.asList("admin")), new 
HashSet<>(Arrays.asList("tom", "guest")), 10, 10);
+    verifier.init(config, options);
+    tokenStorage.clear();
+
+    for (int i = 0; i < 128; i++) {
+      executor.submit(privilegedLogin);
+    }
+    Thread.sleep(1000L);
+    Assert.assertEquals(10, verifier.countValidTokensForUser("admin"));
+  }
+
+  @SuppressWarnings("PMD.DoNotUseThreads")
+  @Test
+  public void testNonPrivilegedLoginLogoutStress() throws 
ServiceLifecycleException, InterruptedException {
+    GatewayConfig config = mockConfig(new HashSet<>(Arrays.asList("admin")), 
new HashSet<>(Arrays.asList("tom", "guest")), 256, 256);
+    verifier.init(config, options);
+
+    ExecutorService executor = Executors.newFixedThreadPool(128);
+    BlockingQueue<JWT> tokenStorage = new ArrayBlockingQueue<>(256);
+    CyclicBarrier barrier = new CyclicBarrier(128);
+
+    Runnable nonPrivilegedLogin = () -> {
+      JWT token;
+      try {
+        token = tokenAuthority.issueToken(jwtAttributesForTom);
+        tokenStorage.add(token);
+        barrier.await();
+      } catch (InterruptedException | BrokenBarrierException | 
TokenServiceException e) {
+        throw new RuntimeException(e);
+      }
+      verifier.verifySessionForUser("tom", token);
+    };
+
+    for (int i = 0; i < 128; i++) {
+      executor.submit(nonPrivilegedLogin);
+    }
+    Thread.sleep(1000L);
+    Assert.assertEquals(128, verifier.countValidTokensForUser("tom"));
+
+    Runnable nonPrivilegedLogout = () -> {
+      JWT token;
+      try {
+        token = tokenStorage.take();
+        barrier.await();
+      } catch (InterruptedException | BrokenBarrierException e) {
+        throw new RuntimeException(e);
+      }
+      verifier.sessionEndedForUser("tom", String.valueOf(token));
+    };
+
+    for (int i = 0; i < 64; i++) {
+      executor.submit(nonPrivilegedLogin);
+    }
+    for (int i = 0; i < 64; i++) {
+      executor.submit(nonPrivilegedLogout);
+    }
+    Thread.sleep(1000L);
+    Assert.assertEquals(128, verifier.countValidTokensForUser("tom"));
+
+    for (int i = 0; i < 128; i++) {
+      executor.submit(nonPrivilegedLogout);
+    }
+    Thread.sleep(1000L);
+    Assert.assertEquals(0, verifier.countValidTokensForUser("tom"));
+
+    config = mockConfig(new HashSet<>(Arrays.asList("admin")), new 
HashSet<>(Arrays.asList("tom", "guest")), 10, 10);
+    verifier.init(config, options);
+    tokenStorage.clear();
+
+    for (int i = 0; i < 128; i++) {
+      executor.submit(nonPrivilegedLogin);
+    }
+    Thread.sleep(1000L);
+    Assert.assertEquals(10, verifier.countValidTokensForUser("tom"));
+  }
 }
 
 
diff --git 
a/gateway-service-knoxsso/src/main/java/org/apache/knox/gateway/service/knoxsso/WebSSOResource.java
 
b/gateway-service-knoxsso/src/main/java/org/apache/knox/gateway/service/knoxsso/WebSSOResource.java
index 59af1c6d3..4aad6e942 100644
--- 
a/gateway-service-knoxsso/src/main/java/org/apache/knox/gateway/service/knoxsso/WebSSOResource.java
+++ 
b/gateway-service-knoxsso/src/main/java/org/apache/knox/gateway/service/knoxsso/WebSSOResource.java
@@ -292,7 +292,7 @@ public class WebSSOResource {
       // Coverity CID 1327959
       if (token != null) {
         ConcurrentSessionVerifier verifier = 
services.getService(ServiceType.CONCURRENT_SESSION_VERIFIER);
-        if (verifier != null && !verifier.verifySessionForUser(p.getName(), 
token)) {
+        if (!verifier.verifySessionForUser(p.getName(), token)) {
           throw new WebApplicationException("Too many sessions for user: " + 
request.getUserPrincipal().getName(), Response.Status.FORBIDDEN);
         }
         addJWTHadoopCookie(original, token);
diff --git 
a/gateway-service-knoxsso/src/test/java/org/apache/knox/gateway/service/knoxsso/WebSSOResourceTest.java
 
b/gateway-service-knoxsso/src/test/java/org/apache/knox/gateway/service/knoxsso/WebSSOResourceTest.java
index a3774fd77..126b4d119 100644
--- 
a/gateway-service-knoxsso/src/test/java/org/apache/knox/gateway/service/knoxsso/WebSSOResourceTest.java
+++ 
b/gateway-service-knoxsso/src/test/java/org/apache/knox/gateway/service/knoxsso/WebSSOResourceTest.java
@@ -41,6 +41,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import javax.servlet.ServletContext;
 import javax.servlet.ServletOutputStream;
 import javax.servlet.http.Cookie;
@@ -50,15 +51,6 @@ import javax.servlet.http.HttpServletResponseWrapper;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
 
-import com.nimbusds.jose.JOSEException;
-import com.nimbusds.jose.JOSEObjectType;
-import com.nimbusds.jose.JWSSigner;
-import com.nimbusds.jose.JWSVerifier;
-import com.nimbusds.jose.KeyLengthException;
-import com.nimbusds.jose.crypto.MACSigner;
-import com.nimbusds.jose.crypto.MACVerifier;
-import com.nimbusds.jose.crypto.RSASSASigner;
-import com.nimbusds.jose.crypto.RSASSAVerifier;
 import org.apache.http.HttpStatus;
 import org.apache.knox.gateway.audit.log4j.audit.Log4jAuditor;
 import org.apache.knox.gateway.config.GatewayConfig;
@@ -78,6 +70,16 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.nimbusds.jose.JOSEException;
+import com.nimbusds.jose.JOSEObjectType;
+import com.nimbusds.jose.JWSSigner;
+import com.nimbusds.jose.JWSVerifier;
+import com.nimbusds.jose.KeyLengthException;
+import com.nimbusds.jose.crypto.MACSigner;
+import com.nimbusds.jose.crypto.MACVerifier;
+import com.nimbusds.jose.crypto.RSASSASigner;
+import com.nimbusds.jose.crypto.RSASSAVerifier;
+
 /**
  * Some tests for the Knox SSO service.
  */
@@ -552,6 +554,10 @@ public class WebSSOResourceTest {
     
EasyMock.expect(services.getService(ServiceType.ALIAS_SERVICE)).andReturn(aliasService).anyTimes();
     
EasyMock.expect(aliasService.getPasswordFromAliasForGateway(TokenUtils.SIGNING_HMAC_SECRET_ALIAS)).andReturn(null).anyTimes();
 
+    ConcurrentSessionVerifier concurrentSessionVerifier = 
EasyMock.createNiceMock(ConcurrentSessionVerifier.class);
+    
EasyMock.expect(concurrentSessionVerifier.verifySessionForUser(anyString(), 
anyObject())).andReturn(true).anyTimes();
+    
EasyMock.expect(services.getService(ServiceType.CONCURRENT_SESSION_VERIFIER)).andReturn(concurrentSessionVerifier).anyTimes();
+
     JWTokenAuthority authority = new TestJWTokenAuthority(gatewayPublicKey, 
gatewayPrivateKey);
     
EasyMock.expect(services.getService(ServiceType.TOKEN_SERVICE)).andReturn(authority).anyTimes();
 
@@ -559,7 +565,7 @@ public class WebSSOResourceTest {
     ServletOutputStream outputStream = 
EasyMock.createNiceMock(ServletOutputStream.class);
     CookieResponseWrapper responseWrapper = new 
CookieResponseWrapper(response, outputStream);
 
-    EasyMock.replay(principal, services, context, contextNoParam, request, 
aliasService);
+    EasyMock.replay(principal, services, context, contextNoParam, request, 
aliasService, concurrentSessionVerifier);
 
     /* declare knoxtoken as part of knoxsso param so it is stripped from the 
final url */
     WebSSOResource webSSOResponse = new WebSSOResource();
@@ -666,11 +672,15 @@ public class WebSSOResourceTest {
     
EasyMock.expect(aliasService.getPasswordFromAliasForGateway(TokenUtils.SIGNING_HMAC_SECRET_ALIAS)).andReturn(null).anyTimes();
     
EasyMock.expect(services.getService(ServiceType.ALIAS_SERVICE)).andReturn(aliasService).anyTimes();
 
+    ConcurrentSessionVerifier concurrentSessionVerifier = 
EasyMock.createNiceMock(ConcurrentSessionVerifier.class);
+    
EasyMock.expect(concurrentSessionVerifier.verifySessionForUser(anyString(), 
anyObject())).andReturn(true).anyTimes();
+    
EasyMock.expect(services.getService(ServiceType.CONCURRENT_SESSION_VERIFIER)).andReturn(concurrentSessionVerifier).anyTimes();
+
     HttpServletResponse response = 
EasyMock.createNiceMock(HttpServletResponse.class);
     ServletOutputStream outputStream = 
EasyMock.createNiceMock(ServletOutputStream.class);
     CookieResponseWrapper responseWrapper = new 
CookieResponseWrapper(response, outputStream);
 
-    EasyMock.replay(principal, services, context, request, aliasService);
+    EasyMock.replay(principal, services, context, request, aliasService, 
concurrentSessionVerifier);
 
     WebSSOResource webSSOResponse = new WebSSOResource();
     webSSOResponse.request = request;
diff --git 
a/gateway-service-knoxssout/src/main/java/org/apache/knox/gateway/service/knoxsso/WebSSOutResource.java
 
b/gateway-service-knoxssout/src/main/java/org/apache/knox/gateway/service/knoxsso/WebSSOutResource.java
index 134972c15..0ce423760 100644
--- 
a/gateway-service-knoxssout/src/main/java/org/apache/knox/gateway/service/knoxsso/WebSSOutResource.java
+++ 
b/gateway-service-knoxssout/src/main/java/org/apache/knox/gateway/service/knoxsso/WebSSOutResource.java
@@ -117,9 +117,7 @@ public class WebSSOutResource {
               (GatewayServices) 
request.getServletContext().getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE);
       if (gwServices != null) {
         ConcurrentSessionVerifier verifier = 
gwServices.getService(ServiceType.CONCURRENT_SESSION_VERIFIER);
-        if (verifier != null) {
-          verifier.sessionEndedForUser(request.getUserPrincipal().getName(), 
ssoCookie.get().getValue());
-        }
+        verifier.sessionEndedForUser(request.getUserPrincipal().getName(), 
ssoCookie.get().getValue());
       }
     } else {
       log.couldNotFindCookieWithTokenToRemove(cookieName, 
request.getUserPrincipal().getName());
diff --git 
a/gateway-spi-common/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
 
b/gateway-spi-common/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
index 96a9a90fd..8196801fa 100644
--- 
a/gateway-spi-common/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
+++ 
b/gateway-spi-common/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
@@ -973,4 +973,9 @@ public class GatewayTestConfig extends Configuration 
implements GatewayConfig {
   public Set<String> getNonPrivilegedUsers() {
     return null;
   }
+
+  @Override
+  public long getConcurrentSessionVerifierExpiredTokensCleaningPeriod() {
+    return 0;
+  }
 }
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java 
b/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
index 70d0f00cc..63f0201bc 100644
--- 
a/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
+++ 
b/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
@@ -823,4 +823,6 @@ public interface GatewayConfig {
   Set<String> getPrivilegedUsers();
 
   Set<String> getNonPrivilegedUsers();
+
+  long getConcurrentSessionVerifierExpiredTokensCleaningPeriod();
 }
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/services/ServiceType.java 
b/gateway-spi/src/main/java/org/apache/knox/gateway/services/ServiceType.java
index 14a1da5d8..a41f92429 100644
--- 
a/gateway-spi/src/main/java/org/apache/knox/gateway/services/ServiceType.java
+++ 
b/gateway-spi/src/main/java/org/apache/knox/gateway/services/ServiceType.java
@@ -36,7 +36,7 @@ public enum ServiceType {
   TOKEN_SERVICE("TokenService"),
   TOKEN_STATE_SERVICE("TokenStateService"),
   TOPOLOGY_SERVICE("TopologyService"),
-  CONCURRENT_SESSION_VERIFIER("ConcurrentSessionCounter");
+  CONCURRENT_SESSION_VERIFIER("ConcurrentSessionVerifier");
 
   private final String serviceTypeName;
   private final String shortName;

Reply via email to