This is an automated email from the ASF dual-hosted git repository.
upthewaterspout pushed a commit to branch feature/GEODE-3637
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-3637 by this
push:
new 6e939a4 Applying review comments
6e939a4 is described below
commit 6e939a45c2ef6360f82a044936de1bbbbafadfe9
Author: Dan Smith <[email protected]>
AuthorDate: Mon Dec 4 17:21:21 2017 -0800
Applying review comments
---
.../internal/cache/tier/sockets/AcceptorImpl.java | 4 +-
.../cache/tier/sockets/ServerConnection.java | 1 -
.../sockets/AcceptorImplClientQueueDUnitTest.java | 82 +++++++++++-----------
3 files changed, 43 insertions(+), 44 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index abc23e7..bdc2d81 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -693,9 +693,9 @@ public class AcceptorImpl implements Acceptor, Runnable,
CommBufferPool {
* @deprecated since 5.1 use cache-server max-threads instead
*/
@Deprecated
- private static final int DEPRECATED_SELECTOR_POOL_SIZE =
+ private final int DEPRECATED_SELECTOR_POOL_SIZE =
Integer.getInteger("BridgeServer.SELECTOR_POOL_SIZE", 16).intValue();
- private static final int HANDSHAKE_POOL_SIZE = Integer
+ private final int HANDSHAKE_POOL_SIZE = Integer
.getInteger("BridgeServer.HANDSHAKE_POOL_SIZE",
HANDSHAKER_DEFAULT_POOL_SIZE).intValue();
@Override
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 74451e5..368a2f8 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -1208,7 +1208,6 @@ public abstract class ServerConnection implements
Runnable {
}
public void registerWithSelector2(Selector s) throws IOException {
- /* this.sKey = */
getSelectableChannel().register(s, SelectionKey.OP_READ, this);
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
index c0b2d07..2752971 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
@@ -21,13 +21,11 @@ import java.io.IOException;
import java.io.Serializable;
import java.rmi.RemoteException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
@@ -48,6 +46,7 @@ import org.apache.geode.cache.server.ClientSubscriptionConfig;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.DistributedLockBlackboard;
import org.apache.geode.distributed.DistributedLockBlackboardImpl;
+import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.DiskStoreAttributes;
import org.apache.geode.internal.cache.InitialImageOperation;
import org.apache.geode.internal.cache.InternalCache;
@@ -57,7 +56,6 @@ import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.CacheRule;
import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
import org.apache.geode.test.dunit.rules.DistributedTestRule;
-import org.apache.geode.test.dunit.rules.SharedCountersRule;
import org.apache.geode.test.junit.categories.DistributedTest;
import
org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
@@ -65,9 +63,7 @@ import
org.apache.geode.test.junit.rules.serializable.SerializableTestName;
@Category(DistributedTest.class)
public class AcceptorImplClientQueueDUnitTest implements Serializable {
private final Host host = Host.getHost(0);
- private static final int numberOfEntries = 200;
- private static final AtomicInteger eventCount = new AtomicInteger(0);
- private static final AtomicBoolean completedClient2 = new
AtomicBoolean(false);
+ private static final int NUMBER_OF_ENTRIES = 200;
@ClassRule
public static DistributedTestRule distributedTestRule = new
DistributedTestRule();
@@ -87,16 +83,11 @@ public class AcceptorImplClientQueueDUnitTest implements
Serializable {
public DistributedRestoreSystemProperties restoreSystemProperties =
new DistributedRestoreSystemProperties();
- private DistributedLockBlackboard blackboard = null;
-
@Before
- public void setup() throws Exception {
- blackboard = DistributedLockBlackboardImpl.getInstance();
- }
+ public void setup() throws Exception {}
@After
public void tearDown() throws RemoteException {
- blackboard.initCount();
host.getAllVMs().forEach((vm) -> vm.invoke(() -> {
InitialImageOperation.slowImageProcessing = 0;
System.getProperties().remove("BridgeServer.HANDSHAKE_POOL_SIZE");
@@ -104,19 +95,18 @@ public class AcceptorImplClientQueueDUnitTest implements
Serializable {
}
@Test
- public void testClientSubscriptionQueueBlockingConnectionInitialization()
throws Exception {
+ public void
clientSubscriptionQueueInitializationShouldNotBlockNewConnections() throws
Exception {
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
- int vm0_port = vm0.invoke("Start server with subscription turned on", ()
-> {
- try {
- return createSubscriptionServer(cacheRule.getCache());
- } catch (IOException e) {
- return 0;
- }
- });
+ // Start one server
+ int vm0_port = vm0.invoke("Start server with subscription turned on",
+ () -> createSubscriptionServer(cacheRule.getCache()));
+
+
+ // Create a durable queue and shutdown the client
vm2.invoke("Start Client1 with durable interest registration turned on",
() -> {
ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
clientCacheFactory.setPoolSubscriptionEnabled(true);
@@ -133,6 +123,8 @@ public class AcceptorImplClientQueueDUnitTest implements
Serializable {
cache.readyForEvents();
cache.close(true);
});
+
+ // Add some entries Which will end up the in the queue
vm3.invoke("Start Client2 to add entries to region", () -> {
ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
@@ -141,26 +133,30 @@ public class AcceptorImplClientQueueDUnitTest implements
Serializable {
cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
Region region = clientRegionFactory.create("subscriptionRegion");
- for (int i = 0; i < numberOfEntries; i++) {
+ for (int i = 0; i < NUMBER_OF_ENTRIES; i++) {
region.put(i, i);
}
cache.close();
});
+ // Start a second server
int vm1_port = vm1.invoke("Start server2 in with subscriptions turned on",
() -> {
try {
int serverPort = createSubscriptionServer(cacheRule.getCache());
- InitialImageOperation.slowImageProcessing = 30;
+ InitialImageOperation.slowImageProcessing = 500;
return serverPort;
} catch (IOException e) {
return 0;
}
});
+ // Make copying the queue slow
vm0.invoke("Turn on slow image processsing", () -> {
- InitialImageOperation.slowImageProcessing = 30;
+ InitialImageOperation.slowImageProcessing = 500;
});
+ // Restart the durable client, which will try to make a copy of the queue,
which will
+ // take a long time because we made it slow
AsyncInvocation<Boolean> completedClient1 =
vm2.invokeAsync("Start Client1, expecting durable messages to be
delivered", () -> {
@@ -169,15 +165,15 @@ public class AcceptorImplClientQueueDUnitTest implements
Serializable {
clientCacheFactory.setPoolSubscriptionRedundancy(1);
clientCacheFactory.setPoolMinConnections(1);
clientCacheFactory.setPoolMaxConnections(1);
- clientCacheFactory.setPoolReadTimeout(200);
clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
ClientCacheFactory cacheFactory =
clientCacheFactory.set("durable-client-id", "1")
.set("durable-client-timeout", "300").set("mcast-port", "0");
- blackboard.incCount();
ClientCache cache = cacheFactory.create();
ClientRegionFactory<Object, Object> clientRegionFactory =
cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+ AtomicInteger eventCount = new AtomicInteger(0);
+
Region region = clientRegionFactory.addCacheListener(new
CacheListenerAdapter() {
@Override
public void afterCreate(EntryEvent event) {
@@ -192,28 +188,23 @@ public class AcceptorImplClientQueueDUnitTest implements
Serializable {
region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
cache.readyForEvents();
- Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS)
- .until(() -> eventCount.get() == numberOfEntries);
+ Awaitility.await().atMost(200, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS)
+ .until(() -> eventCount.get() == NUMBER_OF_ENTRIES);
cache.close();
- return eventCount.get() == numberOfEntries;
+ return eventCount.get() == NUMBER_OF_ENTRIES;
});
- vm3.invokeAsync("Start Client2 to add entries to region", () -> {
- while (true) {
- Thread.sleep(100);
- if (blackboard.getCount() == 1) {
- break;
- }
- }
- ClientCache cache = null;
+ Thread.sleep(500);
+
+ // Start a second client, which should not be blocked by the queue copying
+ vm3.invoke("Start Client2 to add entries to region", () -> {
ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
clientCacheFactory.setPoolRetryAttempts(0);
clientCacheFactory.setPoolMinConnections(1);
clientCacheFactory.setPoolMaxConnections(1);
- clientCacheFactory.setPoolReadTimeout(200);
- clientCacheFactory.setPoolSocketConnectTimeout(500);
+ clientCacheFactory.setPoolSocketConnectTimeout(5000);
clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
- cache = clientCacheFactory.set("mcast-port", "0").create();
+ ClientCache cache = clientCacheFactory.set("mcast-port", "0").create();
ClientRegionFactory<Object, Object> clientRegionFactory =
cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
Region region = clientRegionFactory.create("subscriptionRegion");
@@ -223,10 +214,19 @@ public class AcceptorImplClientQueueDUnitTest implements
Serializable {
returnValue = (int) region.get(i);
}
cache.close();
- completedClient2.set(returnValue == 99);
});
+
+ // Make copying the queue slow
+ turnOffSlowImageProcessing(vm0);
+ turnOffSlowImageProcessing(vm1);
+
assertTrue(completedClient1.get());
- assertTrue(vm3.invoke(() -> completedClient2.get()));
+ }
+
+ private void turnOffSlowImageProcessing(VM vm0) {
+ vm0.invoke("Turn off slow image processsing", () -> {
+ InitialImageOperation.slowImageProcessing = 0;
+ });
}
private int createSubscriptionServer(InternalCache cache) throws IOException
{
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].