This is an automated email from the ASF dual-hosted git repository.

upthewaterspout pushed a commit to branch feature/GEODE-3637
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-3637 by this 
push:
     new 6e939a4  Applying review comments
6e939a4 is described below

commit 6e939a45c2ef6360f82a044936de1bbbbafadfe9
Author: Dan Smith <[email protected]>
AuthorDate: Mon Dec 4 17:21:21 2017 -0800

    Applying review comments
---
 .../internal/cache/tier/sockets/AcceptorImpl.java  |  4 +-
 .../cache/tier/sockets/ServerConnection.java       |  1 -
 .../sockets/AcceptorImplClientQueueDUnitTest.java  | 82 +++++++++++-----------
 3 files changed, 43 insertions(+), 44 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index abc23e7..bdc2d81 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -693,9 +693,9 @@ public class AcceptorImpl implements Acceptor, Runnable, 
CommBufferPool {
    * @deprecated since 5.1 use cache-server max-threads instead
    */
   @Deprecated
-  private static final int DEPRECATED_SELECTOR_POOL_SIZE =
+  private final int DEPRECATED_SELECTOR_POOL_SIZE =
       Integer.getInteger("BridgeServer.SELECTOR_POOL_SIZE", 16).intValue();
-  private static final int HANDSHAKE_POOL_SIZE = Integer
+  private final int HANDSHAKE_POOL_SIZE = Integer
       .getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", 
HANDSHAKER_DEFAULT_POOL_SIZE).intValue();
 
   @Override
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 74451e5..368a2f8 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -1208,7 +1208,6 @@ public abstract class ServerConnection implements 
Runnable {
   }
 
   public void registerWithSelector2(Selector s) throws IOException {
-    /* this.sKey = */
     getSelectableChannel().register(s, SelectionKey.OP_READ, this);
   }
 
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
index c0b2d07..2752971 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
@@ -21,13 +21,11 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.rmi.RemoteException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
@@ -48,6 +46,7 @@ import org.apache.geode.cache.server.ClientSubscriptionConfig;
 import org.apache.geode.cache.util.CacheListenerAdapter;
 import org.apache.geode.distributed.DistributedLockBlackboard;
 import org.apache.geode.distributed.DistributedLockBlackboardImpl;
+import org.apache.geode.internal.cache.CacheServerImpl;
 import org.apache.geode.internal.cache.DiskStoreAttributes;
 import org.apache.geode.internal.cache.InitialImageOperation;
 import org.apache.geode.internal.cache.InternalCache;
@@ -57,7 +56,6 @@ import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.rules.CacheRule;
 import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
 import org.apache.geode.test.dunit.rules.DistributedTestRule;
-import org.apache.geode.test.dunit.rules.SharedCountersRule;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import 
org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
 import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
@@ -65,9 +63,7 @@ import 
org.apache.geode.test.junit.rules.serializable.SerializableTestName;
 @Category(DistributedTest.class)
 public class AcceptorImplClientQueueDUnitTest implements Serializable {
   private final Host host = Host.getHost(0);
-  private static final int numberOfEntries = 200;
-  private static final AtomicInteger eventCount = new AtomicInteger(0);
-  private static final AtomicBoolean completedClient2 = new 
AtomicBoolean(false);
+  private static final int NUMBER_OF_ENTRIES = 200;
 
   @ClassRule
   public static DistributedTestRule distributedTestRule = new 
DistributedTestRule();
@@ -87,16 +83,11 @@ public class AcceptorImplClientQueueDUnitTest implements 
Serializable {
   public DistributedRestoreSystemProperties restoreSystemProperties =
       new DistributedRestoreSystemProperties();
 
-  private DistributedLockBlackboard blackboard = null;
-
   @Before
-  public void setup() throws Exception {
-    blackboard = DistributedLockBlackboardImpl.getInstance();
-  }
+  public void setup() throws Exception {}
 
   @After
   public void tearDown() throws RemoteException {
-    blackboard.initCount();
     host.getAllVMs().forEach((vm) -> vm.invoke(() -> {
       InitialImageOperation.slowImageProcessing = 0;
       System.getProperties().remove("BridgeServer.HANDSHAKE_POOL_SIZE");
@@ -104,19 +95,18 @@ public class AcceptorImplClientQueueDUnitTest implements 
Serializable {
   }
 
   @Test
-  public void testClientSubscriptionQueueBlockingConnectionInitialization() 
throws Exception {
+  public void 
clientSubscriptionQueueInitializationShouldNotBlockNewConnections() throws 
Exception {
     VM vm0 = host.getVM(0);
     VM vm1 = host.getVM(1);
     VM vm2 = host.getVM(2);
     VM vm3 = host.getVM(3);
-    int vm0_port = vm0.invoke("Start server with subscription turned on", () 
-> {
-      try {
-        return createSubscriptionServer(cacheRule.getCache());
-      } catch (IOException e) {
-        return 0;
-      }
-    });
 
+    // Start one server
+    int vm0_port = vm0.invoke("Start server with subscription turned on",
+        () -> createSubscriptionServer(cacheRule.getCache()));
+
+
+    // Create a durable queue and shutdown the client
     vm2.invoke("Start Client1 with durable interest registration turned on", 
() -> {
       ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
       clientCacheFactory.setPoolSubscriptionEnabled(true);
@@ -133,6 +123,8 @@ public class AcceptorImplClientQueueDUnitTest implements 
Serializable {
       cache.readyForEvents();
       cache.close(true);
     });
+
+    // Add some entries Which will end up the in the queue
     vm3.invoke("Start Client2 to add entries to region", () -> {
       ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
       clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
@@ -141,26 +133,30 @@ public class AcceptorImplClientQueueDUnitTest implements 
Serializable {
           cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
       Region region = clientRegionFactory.create("subscriptionRegion");
 
-      for (int i = 0; i < numberOfEntries; i++) {
+      for (int i = 0; i < NUMBER_OF_ENTRIES; i++) {
         region.put(i, i);
       }
       cache.close();
     });
 
+    // Start a second server
     int vm1_port = vm1.invoke("Start server2 in with subscriptions turned on", 
() -> {
       try {
         int serverPort = createSubscriptionServer(cacheRule.getCache());
-        InitialImageOperation.slowImageProcessing = 30;
+        InitialImageOperation.slowImageProcessing = 500;
         return serverPort;
       } catch (IOException e) {
         return 0;
       }
     });
 
+    // Make copying the queue slow
     vm0.invoke("Turn on slow image processsing", () -> {
-      InitialImageOperation.slowImageProcessing = 30;
+      InitialImageOperation.slowImageProcessing = 500;
     });
 
+    // Restart the durable client, which will try to make a copy of the queue, 
which will
+    // take a long time because we made it slow
     AsyncInvocation<Boolean> completedClient1 =
         vm2.invokeAsync("Start Client1, expecting durable messages to be 
delivered", () -> {
 
@@ -169,15 +165,15 @@ public class AcceptorImplClientQueueDUnitTest implements 
Serializable {
           clientCacheFactory.setPoolSubscriptionRedundancy(1);
           clientCacheFactory.setPoolMinConnections(1);
           clientCacheFactory.setPoolMaxConnections(1);
-          clientCacheFactory.setPoolReadTimeout(200);
           clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
           ClientCacheFactory cacheFactory = 
clientCacheFactory.set("durable-client-id", "1")
               .set("durable-client-timeout", "300").set("mcast-port", "0");
-          blackboard.incCount();
           ClientCache cache = cacheFactory.create();
 
           ClientRegionFactory<Object, Object> clientRegionFactory =
               cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+          AtomicInteger eventCount = new AtomicInteger(0);
+
           Region region = clientRegionFactory.addCacheListener(new 
CacheListenerAdapter() {
             @Override
             public void afterCreate(EntryEvent event) {
@@ -192,28 +188,23 @@ public class AcceptorImplClientQueueDUnitTest implements 
Serializable {
 
           region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
           cache.readyForEvents();
-          Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS)
-              .until(() -> eventCount.get() == numberOfEntries);
+          Awaitility.await().atMost(200, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS)
+              .until(() -> eventCount.get() == NUMBER_OF_ENTRIES);
           cache.close();
-          return eventCount.get() == numberOfEntries;
+          return eventCount.get() == NUMBER_OF_ENTRIES;
         });
 
-    vm3.invokeAsync("Start Client2 to add entries to region", () -> {
-      while (true) {
-        Thread.sleep(100);
-        if (blackboard.getCount() == 1) {
-          break;
-        }
-      }
-      ClientCache cache = null;
+    Thread.sleep(500);
+
+    // Start a second client, which should not be blocked by the queue copying
+    vm3.invoke("Start Client2 to add entries to region", () -> {
       ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
       clientCacheFactory.setPoolRetryAttempts(0);
       clientCacheFactory.setPoolMinConnections(1);
       clientCacheFactory.setPoolMaxConnections(1);
-      clientCacheFactory.setPoolReadTimeout(200);
-      clientCacheFactory.setPoolSocketConnectTimeout(500);
+      clientCacheFactory.setPoolSocketConnectTimeout(5000);
       clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
-      cache = clientCacheFactory.set("mcast-port", "0").create();
+      ClientCache cache = clientCacheFactory.set("mcast-port", "0").create();
       ClientRegionFactory<Object, Object> clientRegionFactory =
           cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
       Region region = clientRegionFactory.create("subscriptionRegion");
@@ -223,10 +214,19 @@ public class AcceptorImplClientQueueDUnitTest implements 
Serializable {
         returnValue = (int) region.get(i);
       }
       cache.close();
-      completedClient2.set(returnValue == 99);
     });
+
+    // Make copying the queue slow
+    turnOffSlowImageProcessing(vm0);
+    turnOffSlowImageProcessing(vm1);
+
     assertTrue(completedClient1.get());
-    assertTrue(vm3.invoke(() -> completedClient2.get()));
+  }
+
+  private void turnOffSlowImageProcessing(VM vm0) {
+    vm0.invoke("Turn off slow image processsing", () -> {
+      InitialImageOperation.slowImageProcessing = 0;
+    });
   }
 
   private int createSubscriptionServer(InternalCache cache) throws IOException 
{

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to