This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new a8bc0a3  GEODE-6287: Cleaned up FilterProfile clientMap when client 
disconnects normally
a8bc0a3 is described below

commit a8bc0a389e82c57f729e09fd0c19e50828054a8a
Author: Barry Oglesby <[email protected]>
AuthorDate: Mon Jan 28 13:37:24 2019 -0800

    GEODE-6287: Cleaned up FilterProfile clientMap when client disconnects 
normally
---
 ...ientServerConnectDisconnectDistributedTest.java | 116 ++++++++++++++++-----
 .../apache/geode/internal/cache/FilterProfile.java |  38 +++++++
 2 files changed, 127 insertions(+), 27 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientServerConnectDisconnectDistributedTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientServerConnectDisconnectDistributedTest.java
index 9c0086e..222b1f0 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientServerConnectDisconnectDistributedTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientServerConnectDisconnectDistributedTest.java
@@ -20,35 +20,35 @@ import static org.assertj.core.api.Assertions.assertThat;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.shiro.subject.Subject;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.client.ClientRegionShortcut;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.examples.SimpleSecurityManager;
 import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.FilterProfile;
+import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.test.dunit.rules.ClientVM;
 import org.apache.geode.test.dunit.rules.ClusterStartupRule;
 import org.apache.geode.test.dunit.rules.MemberVM;
 import org.apache.geode.test.junit.categories.SecurityTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
 
 @Category({SecurityTest.class})
 public class ClientServerConnectDisconnectDistributedTest implements 
Serializable {
 
-  @ClassRule
-  public static ClusterStartupRule cluster = new ClusterStartupRule();
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
 
-  private static MemberVM locator;
-  private static MemberVM server;
-  private static ClientVM client;
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
 
   private static List<Subject> serverConnectionSubjects;
 
@@ -56,28 +56,26 @@ public class ClientServerConnectDisconnectDistributedTest 
implements Serializabl
 
   private static List<ClientUserAuths> authorizations;
 
-  @BeforeClass
-  public static void beforeClass() {
-    locator = cluster.startLocatorVM(0, l -> 
l.withSecurityManager(SimpleSecurityManager.class));
+  @Test
+  public void testSubjectsLoggedOutOnClientConnectDisconnect() throws 
Exception {
+    // Start Locator
+    MemberVM locator =
+        cluster.startLocatorVM(0, l -> 
l.withSecurityManager(SimpleSecurityManager.class));
+
+    // Start server
     int locatorPort = locator.getPort();
-    server = cluster.startServerVM(1, s -> s.withCredential("cluster", 
"cluster")
-        
.withConnectionToLocator(locatorPort).withRegion(RegionShortcut.PARTITION,
-            "ClientServerConnectDisconnectDistributedTest_region"));
-  }
+    String regionName = testName.getMethodName() + "_region";
+    MemberVM server = cluster.startServerVM(1, s -> 
s.withCredential("cluster", "cluster")
+        
.withConnectionToLocator(locatorPort).withRegion(RegionShortcut.PARTITION, 
regionName));
 
-  @Test
-  public void testClientConnectDisconnect() throws Exception {
     // Connect client
-    int locatorPort = locator.getPort();
-    client = cluster.startClientVM(3, c -> c.withCredential("data", "data")
+    ClientVM client = cluster.startClientVM(3, c -> c.withCredential("data", 
"data")
         .withPoolSubscription(true)
         .withLocatorConnection(locatorPort));
 
     // Do some puts
     client.invoke(() -> {
-      ClientCache cache = ClusterStartupRule.getClientCache();
-      Region region = 
cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
-          .create("ClientServerConnectDisconnectDistributedTest_region");
+      Region region = 
ClusterStartupRule.clientCacheRule.createProxyRegion(regionName);
       for (int i = 0; i < 10; i++) {
         Object key = String.valueOf(i);
         region.put(key, key);
@@ -96,6 +94,41 @@ public class ClientServerConnectDisconnectDistributedTest 
implements Serializabl
     server.invoke(() -> verifySubjectsAreLoggedOut());
   }
 
+  @Test
+  public void testFilterProfileCleanupOnClientConnectDisconnect() throws 
Exception {
+    // Start Locator
+    MemberVM locator = cluster.startLocatorVM(0);
+
+    // Start server
+    int locatorPort = locator.getPort();
+    String regionName = testName.getMethodName() + "_region";
+    MemberVM server = cluster.startServerVM(1, s -> 
s.withConnectionToLocator(locatorPort)
+        .withRegion(RegionShortcut.PARTITION, regionName));
+
+    // Connect client
+    ClientVM client = cluster.startClientVM(3,
+        c -> c.withPoolSubscription(true).withLocatorConnection(locatorPort));
+
+    // Create client region and register interest
+    client.invoke(() -> {
+      
ClusterStartupRule.clientCacheRule.createProxyRegion(regionName).registerInterestForAllKeys();
+    });
+
+    // Verify proxy id is registered in filter profile
+    server.invoke(() -> verifyRealAndWireProxyIdsInFilterProfile(regionName, 
1));
+
+    // Close client
+    client.invoke(() -> {
+      ClusterStartupRule.getClientCache().close();
+    });
+
+    // Wait for CacheClientProxy to be closed
+    server.invoke(() -> waitForCacheClientProxyToBeClosed());
+
+    // Verify proxy id is unregistered from filter profile
+    server.invoke(() -> verifyRealAndWireProxyIdsInFilterProfile(regionName, 
0));
+  }
+
   private void verifySubjectsAreLoggedIn() {
     AcceptorImpl acceptor = getAcceptor();
 
@@ -145,17 +178,25 @@ public class ClientServerConnectDisconnectDistributedTest 
implements Serializabl
   private void verifySubjectsAreLoggedOut() {
     AcceptorImpl acceptor = getAcceptor();
 
+    // Wait for ServerConnections to be closed
+    waitForServerConnectionsToBeClosed(acceptor);
+
     // Verify ServerConnection subjects are logged out
-    verifyServerConnectionSubjectsAreLoggedOut(acceptor);
+    verifyServerConnectionSubjectsAreLoggedOut();
+
+    // Wait for CacheClientProxy to be closed
+    waitForCacheClientProxyToBeClosed(acceptor);
 
     // Verify the CacheClientProxy subject is logged out
-    verifyCacheClientProxyIsLoggedOut(acceptor);
+    verifyCacheClientProxyIsLoggedOut();
   }
 
-  private void verifyServerConnectionSubjectsAreLoggedOut(AcceptorImpl 
acceptor) {
+  private void waitForServerConnectionsToBeClosed(AcceptorImpl acceptor) {
     // Wait for all ServerConnections to be closed since handleTermination is 
in the finally block
     await().until(() -> acceptor.getAllServerConnections().isEmpty());
+  }
 
+  private void verifyServerConnectionSubjectsAreLoggedOut() {
     for (Subject subject : serverConnectionSubjects) {
       assertThat(subject.getPrincipal()).isNull();
       assertThat(subject.getPrincipals()).isNull();
@@ -167,12 +208,33 @@ public class ClientServerConnectDisconnectDistributedTest 
implements Serializabl
     }
   }
 
-  private void verifyCacheClientProxyIsLoggedOut(AcceptorImpl acceptor) {
+  private void waitForCacheClientProxyToBeClosed(AcceptorImpl acceptor) {
     // Wait for the CacheClientProxy to be closed since handleTermination is 
in the finally block
     await().until(() -> 
acceptor.getCacheClientNotifier().getClientProxies().isEmpty());
+  }
 
+  private void verifyCacheClientProxyIsLoggedOut() {
     assertThat(proxySubject.getPrincipal()).isNull();
     assertThat(proxySubject.getPrincipals()).isNull();
     assertThat(proxySubject.isAuthenticated()).isFalse();
   }
+
+  private void waitForCacheClientProxyToBeClosed() {
+    waitForCacheClientProxyToBeClosed(getAcceptor());
+  }
+
+  private void verifyRealAndWireProxyIdsInFilterProfile(String regionName, int 
expectedNumIds) {
+    // Get filter profile
+    Cache cache = ClusterStartupRule.getCache();
+    LocalRegion region = (LocalRegion) cache.getRegion(regionName);
+    FilterProfile fp = region.getFilterProfile();
+
+    // Assert expectedNumIds real proxy id
+    Set realProxyIds = fp.getRealClientIds();
+    assertThat(realProxyIds.size()).isEqualTo(expectedNumIds);
+
+    // Assert expectedNumIds wire proxy id
+    Set wireProxyIds = fp.getWireClientIds();
+    assertThat(wireProxyIds.size()).isEqualTo(expectedNumIds);
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
index 3c740e6..afe292d 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
@@ -37,6 +37,7 @@ import org.apache.geode.CancelException;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.InternalGemFireError;
 import org.apache.geode.SystemFailure;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.CacheEvent;
 import org.apache.geode.cache.EntryEvent;
 import org.apache.geode.cache.Operation;
@@ -942,6 +943,9 @@ public class FilterProfile implements 
DataSerializableFixedID {
         this.closeCq(cq);
       }
     }
+
+    // Remove the client from the clientMap
+    this.clientMap.removeIDMapping(client);
   }
 
   /**
@@ -1705,6 +1709,28 @@ public class FilterProfile implements 
DataSerializableFixedID {
   }
 
   /**
+   * Return the set of real client proxy ids
+   *
+   * @return the set of real client proxy ids
+   */
+  @VisibleForTesting
+  public Set getRealClientIds() {
+    return clientMap == null ? Collections.emptySet()
+        : Collections.unmodifiableSet(clientMap.realIDs.keySet());
+  }
+
+  /**
+   * Return the set of wire client proxy ids
+   *
+   * @return the set of wire client proxy ids
+   */
+  @VisibleForTesting
+  public Set getWireClientIds() {
+    return clientMap == null ? Collections.emptySet()
+        : Collections.unmodifiableSet(clientMap.wireIDs.keySet());
+  }
+
+  /**
    * ensure that the given query contains a filter routing ID
    */
   private void ensureCqID(ServerCQ cq) {
@@ -2024,6 +2050,7 @@ public class FilterProfile implements 
DataSerializableFixedID {
               this.hasLongID = true;
             }
             result = nextID++;
+            logger.warn("XXX IDMap.getWireID about to put realId=" + realId, 
new Exception());
             this.realIDs.put(realId, result);
             this.wireIDs.put(result, realId);
           }
@@ -2074,6 +2101,17 @@ public class FilterProfile implements 
DataSerializableFixedID {
       }
     }
 
+    /**
+     * remove the mapping for the given proxy ID
+     */
+    void removeIDMapping(Object clientId) {
+      logger.warn("XXX IDMap.removeIDMapping about to remove clientId=" + 
clientId,
+          new Exception());
+      Long mappedId = this.realIDs.remove(clientId);
+      if (mappedId != null) {
+        this.wireIDs.remove(mappedId);
+      }
+    }
   }
 
   /**

Reply via email to