This is an automated email from the ASF dual-hosted git repository.
boglesby pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new a8bc0a3 GEODE-6287: Cleaned up FilterProfile clientMap when client
disconnects normally
a8bc0a3 is described below
commit a8bc0a389e82c57f729e09fd0c19e50828054a8a
Author: Barry Oglesby <[email protected]>
AuthorDate: Mon Jan 28 13:37:24 2019 -0800
GEODE-6287: Cleaned up FilterProfile clientMap when client disconnects
normally
---
...ientServerConnectDisconnectDistributedTest.java | 116 ++++++++++++++++-----
.../apache/geode/internal/cache/FilterProfile.java | 38 +++++++
2 files changed, 127 insertions(+), 27 deletions(-)
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientServerConnectDisconnectDistributedTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientServerConnectDisconnectDistributedTest.java
index 9c0086e..222b1f0 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientServerConnectDisconnectDistributedTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientServerConnectDisconnectDistributedTest.java
@@ -20,35 +20,35 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import org.apache.shiro.subject.Subject;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.examples.SimpleSecurityManager;
import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.FilterProfile;
+import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.test.dunit.rules.ClientVM;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.categories.SecurityTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
@Category({SecurityTest.class})
public class ClientServerConnectDisconnectDistributedTest implements
Serializable {
- @ClassRule
- public static ClusterStartupRule cluster = new ClusterStartupRule();
+ @Rule
+ public ClusterStartupRule cluster = new ClusterStartupRule();
- private static MemberVM locator;
- private static MemberVM server;
- private static ClientVM client;
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
private static List<Subject> serverConnectionSubjects;
@@ -56,28 +56,26 @@ public class ClientServerConnectDisconnectDistributedTest
implements Serializabl
private static List<ClientUserAuths> authorizations;
- @BeforeClass
- public static void beforeClass() {
- locator = cluster.startLocatorVM(0, l ->
l.withSecurityManager(SimpleSecurityManager.class));
+ @Test
+ public void testSubjectsLoggedOutOnClientConnectDisconnect() throws
Exception {
+ // Start Locator
+ MemberVM locator =
+ cluster.startLocatorVM(0, l ->
l.withSecurityManager(SimpleSecurityManager.class));
+
+ // Start server
int locatorPort = locator.getPort();
- server = cluster.startServerVM(1, s -> s.withCredential("cluster",
"cluster")
-
.withConnectionToLocator(locatorPort).withRegion(RegionShortcut.PARTITION,
- "ClientServerConnectDisconnectDistributedTest_region"));
- }
+ String regionName = testName.getMethodName() + "_region";
+ MemberVM server = cluster.startServerVM(1, s ->
s.withCredential("cluster", "cluster")
+
.withConnectionToLocator(locatorPort).withRegion(RegionShortcut.PARTITION,
regionName));
- @Test
- public void testClientConnectDisconnect() throws Exception {
// Connect client
- int locatorPort = locator.getPort();
- client = cluster.startClientVM(3, c -> c.withCredential("data", "data")
+ ClientVM client = cluster.startClientVM(3, c -> c.withCredential("data",
"data")
.withPoolSubscription(true)
.withLocatorConnection(locatorPort));
// Do some puts
client.invoke(() -> {
- ClientCache cache = ClusterStartupRule.getClientCache();
- Region region =
cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
- .create("ClientServerConnectDisconnectDistributedTest_region");
+ Region region =
ClusterStartupRule.clientCacheRule.createProxyRegion(regionName);
for (int i = 0; i < 10; i++) {
Object key = String.valueOf(i);
region.put(key, key);
@@ -96,6 +94,41 @@ public class ClientServerConnectDisconnectDistributedTest
implements Serializabl
server.invoke(() -> verifySubjectsAreLoggedOut());
}
+ @Test
+ public void testFilterProfileCleanupOnClientConnectDisconnect() throws
Exception {
+ // Start Locator
+ MemberVM locator = cluster.startLocatorVM(0);
+
+ // Start server
+ int locatorPort = locator.getPort();
+ String regionName = testName.getMethodName() + "_region";
+ MemberVM server = cluster.startServerVM(1, s ->
s.withConnectionToLocator(locatorPort)
+ .withRegion(RegionShortcut.PARTITION, regionName));
+
+ // Connect client
+ ClientVM client = cluster.startClientVM(3,
+ c -> c.withPoolSubscription(true).withLocatorConnection(locatorPort));
+
+ // Create client region and register interest
+ client.invoke(() -> {
+
ClusterStartupRule.clientCacheRule.createProxyRegion(regionName).registerInterestForAllKeys();
+ });
+
+ // Verify proxy id is registered in filter profile
+ server.invoke(() -> verifyRealAndWireProxyIdsInFilterProfile(regionName,
1));
+
+ // Close client
+ client.invoke(() -> {
+ ClusterStartupRule.getClientCache().close();
+ });
+
+ // Wait for CacheClientProxy to be closed
+ server.invoke(() -> waitForCacheClientProxyToBeClosed());
+
+ // Verify proxy id is unregistered from filter profile
+ server.invoke(() -> verifyRealAndWireProxyIdsInFilterProfile(regionName,
0));
+ }
+
private void verifySubjectsAreLoggedIn() {
AcceptorImpl acceptor = getAcceptor();
@@ -145,17 +178,25 @@ public class ClientServerConnectDisconnectDistributedTest
implements Serializabl
private void verifySubjectsAreLoggedOut() {
AcceptorImpl acceptor = getAcceptor();
+ // Wait for ServerConnections to be closed
+ waitForServerConnectionsToBeClosed(acceptor);
+
// Verify ServerConnection subjects are logged out
- verifyServerConnectionSubjectsAreLoggedOut(acceptor);
+ verifyServerConnectionSubjectsAreLoggedOut();
+
+ // Wait for CacheClientProxy to be closed
+ waitForCacheClientProxyToBeClosed(acceptor);
// Verify the CacheClientProxy subject is logged out
- verifyCacheClientProxyIsLoggedOut(acceptor);
+ verifyCacheClientProxyIsLoggedOut();
}
- private void verifyServerConnectionSubjectsAreLoggedOut(AcceptorImpl
acceptor) {
+ private void waitForServerConnectionsToBeClosed(AcceptorImpl acceptor) {
// Wait for all ServerConnections to be closed since handleTermination is
in the finally block
await().until(() -> acceptor.getAllServerConnections().isEmpty());
+ }
+ private void verifyServerConnectionSubjectsAreLoggedOut() {
for (Subject subject : serverConnectionSubjects) {
assertThat(subject.getPrincipal()).isNull();
assertThat(subject.getPrincipals()).isNull();
@@ -167,12 +208,33 @@ public class ClientServerConnectDisconnectDistributedTest
implements Serializabl
}
}
- private void verifyCacheClientProxyIsLoggedOut(AcceptorImpl acceptor) {
+ private void waitForCacheClientProxyToBeClosed(AcceptorImpl acceptor) {
// Wait for the CacheClientProxy to be closed since handleTermination is
in the finally block
await().until(() ->
acceptor.getCacheClientNotifier().getClientProxies().isEmpty());
+ }
+ private void verifyCacheClientProxyIsLoggedOut() {
assertThat(proxySubject.getPrincipal()).isNull();
assertThat(proxySubject.getPrincipals()).isNull();
assertThat(proxySubject.isAuthenticated()).isFalse();
}
+
+ private void waitForCacheClientProxyToBeClosed() {
+ waitForCacheClientProxyToBeClosed(getAcceptor());
+ }
+
+ private void verifyRealAndWireProxyIdsInFilterProfile(String regionName, int
expectedNumIds) {
+ // Get filter profile
+ Cache cache = ClusterStartupRule.getCache();
+ LocalRegion region = (LocalRegion) cache.getRegion(regionName);
+ FilterProfile fp = region.getFilterProfile();
+
+ // Assert expectedNumIds real proxy id
+ Set realProxyIds = fp.getRealClientIds();
+ assertThat(realProxyIds.size()).isEqualTo(expectedNumIds);
+
+ // Assert expectedNumIds wire proxy id
+ Set wireProxyIds = fp.getWireClientIds();
+ assertThat(wireProxyIds.size()).isEqualTo(expectedNumIds);
+ }
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
index 3c740e6..afe292d 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
@@ -37,6 +37,7 @@ import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.SystemFailure;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Operation;
@@ -942,6 +943,9 @@ public class FilterProfile implements
DataSerializableFixedID {
this.closeCq(cq);
}
}
+
+ // Remove the client from the clientMap
+ this.clientMap.removeIDMapping(client);
}
/**
@@ -1705,6 +1709,28 @@ public class FilterProfile implements
DataSerializableFixedID {
}
/**
+ * Return the set of real client proxy ids
+ *
+ * @return the set of real client proxy ids
+ */
+ @VisibleForTesting
+ public Set getRealClientIds() {
+ return clientMap == null ? Collections.emptySet()
+ : Collections.unmodifiableSet(clientMap.realIDs.keySet());
+ }
+
+ /**
+ * Return the set of wire client proxy ids
+ *
+ * @return the set of wire client proxy ids
+ */
+ @VisibleForTesting
+ public Set getWireClientIds() {
+ return clientMap == null ? Collections.emptySet()
+ : Collections.unmodifiableSet(clientMap.wireIDs.keySet());
+ }
+
+ /**
* ensure that the given query contains a filter routing ID
*/
private void ensureCqID(ServerCQ cq) {
@@ -2024,6 +2050,7 @@ public class FilterProfile implements
DataSerializableFixedID {
this.hasLongID = true;
}
result = nextID++;
+ logger.warn("XXX IDMap.getWireID about to put realId=" + realId,
new Exception());
this.realIDs.put(realId, result);
this.wireIDs.put(result, realId);
}
@@ -2074,6 +2101,17 @@ public class FilterProfile implements
DataSerializableFixedID {
}
}
+ /**
+ * remove the mapping for the given proxy ID
+ */
+ void removeIDMapping(Object clientId) {
+ logger.warn("XXX IDMap.removeIDMapping about to remove clientId=" +
clientId,
+ new Exception());
+ Long mappedId = this.realIDs.remove(clientId);
+ if (mappedId != null) {
+ this.wireIDs.remove(mappedId);
+ }
+ }
}
/**