This is an automated email from the ASF dual-hosted git repository. boglesby pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new f2a1248 GEODE-6267: Logged out subject f2a1248 is described below commit f2a124863e294d79e029857893d26bd7e7f69172 Author: Barry Oglesby <bogle...@users.noreply.github.com> AuthorDate: Wed Jan 23 08:57:31 2019 -0800 GEODE-6267: Logged out subject --- ...ientServerConnectDisconnectDistributedTest.java | 178 +++++++++++++++++++++ .../cache/tier/sockets/CacheClientProxy.java | 24 ++- .../cache/tier/sockets/ClientUserAuths.java | 13 ++ .../cache/tier/sockets/ServerConnection.java | 6 + 4 files changed, 219 insertions(+), 2 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientServerConnectDisconnectDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientServerConnectDisconnectDistributedTest.java new file mode 100644 index 0000000..9c0086e --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientServerConnectDisconnectDistributedTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.tier.sockets; + +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.shiro.subject.Subject; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.examples.SimpleSecurityManager; +import org.apache.geode.internal.cache.CacheServerImpl; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.junit.categories.SecurityTest; + +@Category({SecurityTest.class}) +public class ClientServerConnectDisconnectDistributedTest implements Serializable { + + @ClassRule + public static ClusterStartupRule cluster = new ClusterStartupRule(); + + private static MemberVM locator; + private static MemberVM server; + private static ClientVM client; + + private static List<Subject> serverConnectionSubjects; + + private static Subject proxySubject; + + private static List<ClientUserAuths> authorizations; + + @BeforeClass + public static void beforeClass() { + locator = cluster.startLocatorVM(0, l -> l.withSecurityManager(SimpleSecurityManager.class)); + int locatorPort = locator.getPort(); + server = cluster.startServerVM(1, s -> s.withCredential("cluster", "cluster") + .withConnectionToLocator(locatorPort).withRegion(RegionShortcut.PARTITION, + "ClientServerConnectDisconnectDistributedTest_region")); + } + + @Test + public void testClientConnectDisconnect() throws Exception { + // Connect client + int locatorPort = locator.getPort(); + client = cluster.startClientVM(3, c -> c.withCredential("data", "data") + .withPoolSubscription(true) + .withLocatorConnection(locatorPort)); + + // Do some puts + client.invoke(() -> { + ClientCache cache = ClusterStartupRule.getClientCache(); + Region region = cache.createClientRegionFactory(ClientRegionShortcut.PROXY) + .create("ClientServerConnectDisconnectDistributedTest_region"); + for (int i = 0; i < 10; i++) { + Object key = String.valueOf(i); + region.put(key, key); + } + }); + + // Verify client sessions are logged in on the server + server.invoke(() -> verifySubjectsAreLoggedIn()); + + // Close client + client.invoke(() -> { + ClusterStartupRule.getClientCache().close(); + }); + + // Verify client sessions are logged out on the server + server.invoke(() -> verifySubjectsAreLoggedOut()); + } + + private void verifySubjectsAreLoggedIn() { + AcceptorImpl acceptor = getAcceptor(); + + // Verify ServerConnection subjects are logged in + verifyServerConnectionSubjectsAreLoggedIn(acceptor); + + // Verify CacheClientProxy subject is logged out + verifyCacheClientProxySubjectIsLoggedIn(acceptor); + } + + private AcceptorImpl getAcceptor() { + Cache cache = ClusterStartupRule.getCache(); + List<CacheServer> cacheServers = cache.getCacheServers(); + CacheServer cacheServer = cacheServers.get(0); + return ((CacheServerImpl) cacheServer).getAcceptor(); + } + + private void verifyServerConnectionSubjectsAreLoggedIn(AcceptorImpl acceptor) { + serverConnectionSubjects = new ArrayList<>(); + authorizations = new ArrayList<>(); + for (ServerConnection sc : acceptor.getAllServerConnections()) { + ClientUserAuths auth = sc.getClientUserAuths(); + assertThat(auth.getSubjects().size()).isNotEqualTo(0); + authorizations.add(auth); + for (Subject subject : auth.getSubjects()) { + assertThat(subject.getPrincipal()).isNotNull(); + assertThat(subject.getPrincipals()).isNotNull(); + assertThat(subject.isAuthenticated()).isTrue(); + serverConnectionSubjects.add(subject); + } + } + } + + private void verifyCacheClientProxySubjectIsLoggedIn(AcceptorImpl acceptor) { + // Wait for the CacheClientProxy to be created since its asynchronous + await().until(() -> acceptor.getCacheClientNotifier().getClientProxies().size() == 1); + CacheClientProxy proxy = acceptor.getCacheClientNotifier().getClientProxies().iterator().next(); + + // Check CacheClientProxy subject + proxySubject = proxy.getSubject(); + assertThat(proxySubject).isNotNull(); + assertThat(proxySubject.getPrincipal()).isNotNull(); + assertThat(proxySubject.getPrincipals()).isNotNull(); + assertThat(proxySubject.isAuthenticated()).isTrue(); + } + + private void verifySubjectsAreLoggedOut() { + AcceptorImpl acceptor = getAcceptor(); + + // Verify ServerConnection subjects are logged out + verifyServerConnectionSubjectsAreLoggedOut(acceptor); + + // Verify the CacheClientProxy subject is logged out + verifyCacheClientProxyIsLoggedOut(acceptor); + } + + private void verifyServerConnectionSubjectsAreLoggedOut(AcceptorImpl acceptor) { + // Wait for all ServerConnections to be closed since handleTermination is in the finally block + await().until(() -> acceptor.getAllServerConnections().isEmpty()); + + for (Subject subject : serverConnectionSubjects) { + assertThat(subject.getPrincipal()).isNull(); + assertThat(subject.getPrincipals()).isNull(); + assertThat(subject.isAuthenticated()).isFalse(); + } + + for (ClientUserAuths auth : authorizations) { + assertThat(auth.getSubjects().size()).isEqualTo(0); + } + } + + private void verifyCacheClientProxyIsLoggedOut(AcceptorImpl acceptor) { + // Wait for the CacheClientProxy to be closed since handleTermination is in the finally block + await().until(() -> acceptor.getCacheClientNotifier().getClientProxies().isEmpty()); + + assertThat(proxySubject.getPrincipal()).isNull(); + assertThat(proxySubject.getPrincipals()).isNull(); + assertThat(proxySubject.isAuthenticated()).isFalse(); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java index 2eea9a8..9d98313 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java @@ -45,6 +45,7 @@ import org.apache.shiro.util.ThreadState; import org.apache.geode.CancelException; import org.apache.geode.DataSerializer; import org.apache.geode.StatisticsFactory; +import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.ClientSession; @@ -865,6 +866,7 @@ public class CacheClientProxy implements ClientSession { return; } + boolean closedSocket = false; try { if (logger.isDebugEnabled()) { logger.debug("{}: Terminating processing", this); @@ -912,7 +914,7 @@ public class CacheClientProxy implements ClientSession { // to fix bug 37684 // 1. check to see if dispatcher is still alive if (this._messageDispatcher.isAlive()) { - closeSocket(); + closedSocket = closeSocket(); destroyRQ(); alreadyDestroyed = true; this._messageDispatcher.interrupt(); @@ -941,7 +943,11 @@ public class CacheClientProxy implements ClientSession { } finally { // Close the statistics this._statistics.close(); // fix for bug 40105 - closeTransientFields(); // make sure this happens + if (closedSocket) { + closeOtherTransientFields(); + } else { + closeTransientFields(); // make sure this happens + } } } @@ -963,6 +969,10 @@ public class CacheClientProxy implements ClientSession { return; } + closeOtherTransientFields(); + } + + private void closeOtherTransientFields() { // Null out comm buffer, host address, ports and proxy id. All will be // replaced when the client reconnects. releaseCommBuffer(); @@ -981,6 +991,11 @@ public class CacheClientProxy implements ClientSession { // Commented to fix bug 40259 // this.clientVersion = null; closeNonDurableCqs(); + + // Logout the subject + if (this.subject != null) { + this.subject.logout(); + } } private void releaseCommBuffer() { @@ -1934,6 +1949,11 @@ public class CacheClientProxy implements ClientSession { return this.clientVersion; } + @VisibleForTesting + protected Subject getSubject() { + return this.subject; + } + protected void scheduleDurableExpirationTask() { SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() { @Override diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUserAuths.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUserAuths.java index 2fbb391..5492e10 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUserAuths.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUserAuths.java @@ -14,6 +14,8 @@ */ package org.apache.geode.internal.cache.tier.sockets; +import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.Map; import java.util.Random; @@ -22,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.logging.log4j.Logger; import org.apache.shiro.subject.Subject; +import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.security.AuthorizeRequest; import org.apache.geode.internal.security.AuthorizeRequestPP; @@ -78,6 +81,11 @@ public class ClientUserAuths { return uniqueIdVsUserAuth.get(userId); } + @VisibleForTesting + protected Collection<Subject> getSubjects() { + return Collections.unmodifiableCollection(this.uniqueIdVsSubject.values()); + } + public Subject getSubject(long userId) { return uniqueIdVsSubject.get(userId); } @@ -189,6 +197,11 @@ public class ClientUserAuths { cleanUserAuth(userAuth); } } + + // Logout the subjects + for (Long subjectId : uniqueIdVsSubject.keySet()) { + removeSubject(subjectId); + } } public void fillPreviousCQAuth(ClientUserAuths previousClientUserAuths) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java index 4fe34bf..eae450a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java @@ -45,6 +45,7 @@ import org.apache.shiro.util.ThreadState; import org.apache.geode.CancelException; import org.apache.geode.DataSerializer; import org.apache.geode.SystemFailure; +import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.cache.UnsupportedVersionException; import org.apache.geode.cache.client.internal.AbstractOp; import org.apache.geode.cache.client.internal.Connection; @@ -1102,6 +1103,11 @@ public abstract class ServerConnection implements Runnable { } } + @VisibleForTesting + protected ClientUserAuths getClientUserAuths() { + return this.clientUserAuths; + } + private void setSecurityPart() { try { this.connectionId = randomConnectionIdGen.nextLong();