This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new fe19efe GEODE-4118/GEODE-5406: rework ClientHealthStatsDUnitTest
(#2203)
fe19efe is described below
commit fe19efe01f35c43129d678af745415022c1190e1
Author: jinmeiliao <[email protected]>
AuthorDate: Fri Jul 27 08:56:46 2018 -0700
GEODE-4118/GEODE-5406: rework ClientHealthStatsDUnitTest (#2203)
---
.../management/ClientHealthStatsDUnitTest.java | 412 +++++++--------------
.../geode/test/dunit/rules/ClusterStartupRule.java | 16 +-
.../apache/geode/test/dunit/rules/MemberVM.java | 7 +
.../dunit/rules/tests/MemberStarterRuleTest.java | 9 +
.../geode/test/junit/rules/LocatorStarterRule.java | 20 +-
.../geode/test/junit/rules/MemberStarterRule.java | 27 +-
6 files changed, 201 insertions(+), 290 deletions(-)
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java
index 68fc6c7..2336940 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java
@@ -12,352 +12,210 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
+
package org.apache.geode.management;
-import static java.util.concurrent.TimeUnit.MINUTES;
import static
org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
import static
org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
-import static
org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLING_ENABLED;
-import static org.apache.geode.test.dunit.Host.getHost;
-import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
-import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
-import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName;
import static org.assertj.core.api.Assertions.assertThat;
-import java.io.IOException;
import java.io.Serializable;
-import java.util.Collection;
-import java.util.Properties;
-
-import javax.management.ObjectName;
import org.awaitility.Awaitility;
-import org.awaitility.core.ConditionFactory;
-import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
-import org.apache.geode.cache.Cache;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
-import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
-import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
-import org.apache.geode.management.internal.SystemManagementService;
-import org.apache.geode.test.dunit.VM;
-
-/**
- * Distributed tests for client stats exposed via {@link CacheServerMXBean}:
- * <ul>
- * <li>{@link CacheServerMXBean#showClientStats}
- * <li>{@link CacheServerMXBean#showAllClientStats}
- * <li>{@link CacheServerMXBean#showClientQueueDetails}
- * </ul>
- */
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.VMProvider;
-@SuppressWarnings({"serial", "unused"})
public class ClientHealthStatsDUnitTest implements Serializable {
-
private static final int NUMBER_PUTS = 100;
- private static final String KEY1 = "KEY1";
- private static final String KEY2 = "KEY2";
- private static final String VALUE1 = "VALUE1";
- private static final String VALUE2 = "VALUE2";
+ @Rule
+ public ClusterStartupRule cluster = new ClusterStartupRule();
- private static final String REGION_NAME =
- ClientHealthStatsDUnitTest.class.getSimpleName() + "_Region";
+ private MemberVM locator;
+ private MemberVM server;
+ private ClientVM client1;
+ private ClientVM client2;
- // client1VM and client2VM VM fields
- private static volatile ClientCache clientCache;
private static volatile boolean lastKeyReceived;
- private VM managerVM;
- private VM serverVM;
- private VM client1VM;
- private VM client2VM;
-
- private String hostName;
-
- @Rule
- public ManagementTestRule managementTestRule =
ManagementTestRule.builder().build();
-
@Before
- public void before() throws Exception {
- this.hostName = getServerHostName(getHost(0));
-
- this.managerVM = getHost(0).getVM(0);
- this.serverVM = getHost(0).getVM(1);
- this.client1VM = getHost(0).getVM(2);
- this.client2VM = getHost(0).getVM(3);
-
- addIgnoredException("Connection reset");
- }
-
- @After
- public void after() throws Exception {
- invokeInEveryVM(() -> {
- lastKeyReceived = false;
- clientCache = null;
- });
+ public void before() {
+ locator =
+ cluster.startLocatorVM(0, r ->
r.withoutClusterConfigurationService().withoutHttpService());
+ server = cluster.startServerVM(1, s ->
s.withRegion(RegionShortcut.REPLICATE, "regionA")
+ .withConnectionToLocator(locator.getPort()));
}
@Test
public void testClientHealthStats_SubscriptionEnabled() throws Exception {
- this.managementTestRule.createManager(this.managerVM, false);
- this.managementTestRule.startManager(this.managerVM);
-
- int port = this.serverVM.invoke(() -> createServerCache());
-
- this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1,
true));
- this.client2VM.invoke(() -> createClientCache(this.hostName, port, 2,
true));
-
- DistributedMember serverMember =
this.managementTestRule.getDistributedMember(this.serverVM);
- this.managerVM.invoke(() -> verifyClientStats(serverMember, port, 2));
- this.managementTestRule.stopManager(this.managerVM);
+ client1 = cluster.startClientVM(2, true, server.getPort());
+ client2 = cluster.startClientVM(3, true, server.getPort());
+
+ VMProvider.invokeInEveryMember(() -> {
+ ClientRegionFactory<String, String> regionFactory =
+ ClusterStartupRule.getClientCache()
+ .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
+ Region<String, String> region = regionFactory.create("regionA");
+ // need to do some operation in order for the clients to be registered
in mBean
+ region.put("1", "1");
+ }, client1, client2);
+
+ locator.waitTillClientsAreReadyOnServers(server.getName(),
server.getPort(), 2);
+ verifyClientsAndSubscription(2);
}
@Test
public void testClientHealthStats_SubscriptionDisabled() throws Exception {
- this.managementTestRule.createManager(this.managerVM, false);
- this.managementTestRule.startManager(this.managerVM);
-
- int port = this.serverVM.invoke(() -> createServerCache());
-
- this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1,
false));
- this.client2VM.invoke(() -> createClientCache(this.hostName, port, 2,
false));
-
- DistributedMember serverMember =
this.managementTestRule.getDistributedMember(this.serverVM);
- this.managerVM.invoke(() -> verifyClientStats(serverMember, port, 0));
- this.managementTestRule.stopManager(this.managerVM);
+ client1 = cluster.startClientVM(2, false, server.getPort());
+ client2 = cluster.startClientVM(3, false, server.getPort());
+ VMProvider.invokeInEveryMember(() -> {
+ ClientRegionFactory<String, String> regionFactory =
+ ClusterStartupRule.getClientCache()
+ .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
+ regionFactory.create("regionA");
+ }, client1, client2);
+
+ locator.waitTillClientsAreReadyOnServers(server.getName(),
server.getPort(), 2);
+ verifyClientsAndSubscription(0);
}
@Test
public void testClientHealthStats_DurableClient() throws Exception {
- this.managementTestRule.createManager(this.managerVM, false);
- this.managementTestRule.startManager(this.managerVM);
-
- int port = this.serverVM.invoke(() -> createServerCache());
-
- this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1,
true));
- this.client2VM.invoke(() -> createClientCache(this.hostName, port, 2,
true));
+ client1 = createDurableClient(2);
+ client2 = createDurableClient(3);
- this.client1VM.invoke(() -> clientCache.close(true));
- this.client2VM.invoke(() -> clientCache.close(true));
-
- DistributedMember serverMember =
this.managementTestRule.getDistributedMember(this.serverVM);
- this.managerVM.invoke(() -> verifyClientStats(serverMember, port, 2));
- this.managementTestRule.stopManager(this.managerVM);
+ locator.waitTillClientsAreReadyOnServers(server.getName(),
server.getPort(), 2);
+ VMProvider.invokeInEveryMember(() ->
ClusterStartupRule.getClientCache().close(true), client1,
+ client2);
+ verifyClientsAndSubscription(2);
}
@Test
public void testStatsMatchWithSize() throws Exception {
- // start a serverVM
- int port = this.serverVM.invoke(() -> createServerCache());
+ // create durable client, with durable RI
+ client1 = createDurableClient(2);
- // create durable client1VM, with durable RI
- this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1,
true));
+ // do puts in server
+ server.invoke(() -> {
+ Region<String, String> region =
ClusterStartupRule.getCache().getRegion("/regionA");
- // do puts on serverVM from three different threads, pause after 500 puts
each.
- this.serverVM.invoke(() -> doPuts());
+ Thread thread1 = new Thread(() -> {
+ for (int i = 0; i < NUMBER_PUTS; i++) {
+ region.put("T1_KEY_" + i, "VALUE_" + i);
+ }
+ });
+ Thread thread2 = new Thread(() -> {
+ for (int i = 0; i < NUMBER_PUTS; i++) {
+ region.put("T2_KEY_" + i, "VALUE_" + i);
+ }
+ });
+ Thread thread3 = new Thread(() -> {
+ for (int i = 0; i < NUMBER_PUTS; i++) {
+ region.put("T3_KEY_" + i, "VALUE_" + i);
+ }
+ });
- // close durable client1VM
- this.client1VM.invoke(() -> clientCache.close(true));
+ thread1.start();
+ thread2.start();
+ thread3.start();
- this.serverVM.invoke(() -> await().until(() ->
cacheClientProxyHasBeenPause()));
+ thread1.join();
+ thread2.join();
+ thread3.join();
+ });
- // resume puts on serverVM, add another 100.
- this.serverVM.invoke(() -> resumePuts());
+ // close durable client1
+ client1.invoke(() -> ClusterStartupRule.getClientCache().close(true));
+ server.waitTillCacheClientProxyHasBeenPaused();
- // start durable client1VM
- this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1,
true));
+ // resume puts on serverVM, add another 100.
+ server.invoke(() -> {
+ Region<String, String> region =
ClusterStartupRule.getCache().getRegion("/regionA");
+ for (int i = 0; i < NUMBER_PUTS; i++) {
+ region.put("NEWKEY_" + i, "NEWVALUE_" + i);
+ }
+ region.put("last_key", "last_value");
+ });
+ // start durable client1 again
+ client1 = createDurableClient(2);
// wait for full queue dispatch
- this.client1VM.invoke(() -> await().until(() -> lastKeyReceived));
+ client1.invoke(() -> Awaitility.await().until(() -> lastKeyReceived));
// verify the stats
- this.serverVM.invoke(() -> verifyStats(port));
- }
-
- /**
- * Invoked in serverVM
- */
- private boolean cacheClientProxyHasBeenPause() {
- CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance();
- Collection<CacheClientProxy> clientProxies =
clientNotifier.getClientProxies();
+ server.invoke(() -> {
+ ManagementService service =
ClusterStartupRule.memberStarter.getManagementService();
+ CacheServerMXBean cacheServerMXBean =
service.getLocalCacheServerMXBean(server.getPort());
- for (CacheClientProxy clientProxy : clientProxies) {
- if (clientProxy.isPaused()) {
- return true;
- }
- }
- return false;
- }
+ CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance();
+ CacheClientProxy clientProxy =
clientNotifier.getClientProxies().iterator().next();
+
assertThat(clientProxy.getQueueSizeStat()).isEqualTo(clientProxy.getQueueSize());
- /**
- * Invoked in serverVM
- */
- private int createServerCache() throws IOException {
- Cache cache = this.managementTestRule.getCache();
-
- RegionFactory<String, String> regionFactory =
- cache.createRegionFactory(RegionShortcut.REPLICATE);
- regionFactory.setConcurrencyChecksEnabled(false);
- regionFactory.create(REGION_NAME);
-
- CacheServer cacheServer = cache.addCacheServer();
- cacheServer.setPort(0);
- cacheServer.start();
- return cacheServer.getPort();
+ ClientQueueDetail queueDetails =
cacheServerMXBean.showClientQueueDetails()[0];
+ assertThat((int)
queueDetails.getQueueSize()).isEqualTo(clientProxy.getQueueSizeStat());
+ });
}
- /**
- * Invoked in client1VM and client2VM
- */
- private void createClientCache(final String hostName, final Integer port,
final int clientNum,
- final boolean subscriptionEnabled) {
- Properties props = new Properties();
- props.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
-
- ClientCacheFactory cacheFactory = new ClientCacheFactory(props);
- if (subscriptionEnabled) {
- cacheFactory.setPoolSubscriptionEnabled(true);
- cacheFactory.setPoolSubscriptionAckInterval(50);
- cacheFactory.setPoolSubscriptionRedundancy(0);
- }
-
- cacheFactory.set(DURABLE_CLIENT_ID, "DurableClientId_" + clientNum);
- cacheFactory.set(DURABLE_CLIENT_TIMEOUT, "" + 30000);
-
- cacheFactory.addPoolServer(hostName, port);
- clientCache = cacheFactory.create();
-
- ClientRegionFactory<String, String> regionFactory =
-
clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
- regionFactory.setConcurrencyChecksEnabled(false);
-
- regionFactory.addCacheListener(new CacheListenerAdapter<String, String>() {
- @Override
- public void afterCreate(final EntryEvent<String, String> event) {
- if ("last_key".equals(event.getKey())) {
- lastKeyReceived = true;
- }
- }
+ private ClientVM createDurableClient(int index) throws Exception {
+ ClientVM client = cluster.startClientVM(index, ccf -> {
+ ccf.setPoolSubscriptionEnabled(true);
+ ccf.addPoolServer("localhost", server.getPort());
+ ccf.set(DURABLE_CLIENT_ID, "client" + index);
+ ccf.set(DURABLE_CLIENT_TIMEOUT, "" + 30000);
});
- Region<String, String> region = regionFactory.create(REGION_NAME);
- if (subscriptionEnabled) {
+ client.invoke(() -> {
+ ClientRegionFactory<String, String> regionFactory =
ClusterStartupRule.getClientCache()
+ .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
+ regionFactory.setConcurrencyChecksEnabled(false);
+
+ regionFactory.addCacheListener(new CacheListenerAdapter<String,
String>() {
+ @Override
+ public void afterCreate(final EntryEvent<String, String> event) {
+ if ("last_key".equals(event.getKey())) {
+ lastKeyReceived = true;
+ }
+ }
+ });
+ Region<String, String> region = regionFactory.create("regionA");
region.registerInterest("ALL_KEYS", true);
- clientCache.readyForEvents();
- }
- }
-
- /**
- * Invoked in serverVM
- */
- private void doPuts() throws InterruptedException {
- Cache cache = this.managementTestRule.getCache();
- Region<String, String> region = cache.getRegion(Region.SEPARATOR +
REGION_NAME);
-
- Thread thread1 = new Thread(() -> {
- for (int i = 0; i < NUMBER_PUTS; i++) {
- region.put("T1_KEY_" + i, "VALUE_" + i);
- }
- });
- Thread thread2 = new Thread(() -> {
- for (int i = 0; i < NUMBER_PUTS; i++) {
- region.put("T2_KEY_" + i, "VALUE_" + i);
- }
- });
- Thread thread3 = new Thread(() -> {
- for (int i = 0; i < NUMBER_PUTS; i++) {
- region.put("T3_KEY_" + i, "VALUE_" + i);
- }
+ ClusterStartupRule.getClientCache().readyForEvents();
});
-
- thread1.start();
- thread2.start();
- thread3.start();
-
- thread1.join();
- thread2.join();
- thread3.join();
+ return client;
}
- /**
- * Invoked in serverVM
- */
- private void resumePuts() {
- Cache cache = this.managementTestRule.getCache();
- Region<String, String> region = cache.getRegion(Region.SEPARATOR +
REGION_NAME);
-
- for (int i = 0; i < NUMBER_PUTS; i++) {
- region.put("NEWKEY_" + i, "NEWVALUE_" + i);
- }
- region.put("last_key", "last_value");
- }
-
- /**
- * Invoked in managerVM
- */
- private void verifyClientStats(final DistributedMember serverMember, final
int serverPort,
- final int numSubscriptions) throws Exception {
- ManagementService service = this.managementTestRule.getManagementService();
- CacheServerMXBean cacheServerMXBean = awaitCacheServerMXBean(serverMember,
serverPort);
-
- await().until(() -> cacheServerMXBean.getClientIds().length == 2);
- String[] clientIds = cacheServerMXBean.getClientIds();
+ private void verifyClientsAndSubscription(int subscriptionCount) {
+ locator.invoke(() -> {
+ CacheServerMXBean bean =
+
ClusterStartupRule.memberStarter.getCacheServerMXBean(server.getName(),
server.getPort());
+ String[] clientIds = bean.getClientIds();
- ClientHealthStatus[] clientStatuses =
cacheServerMXBean.showAllClientStats();
-
- ClientHealthStatus clientStatus1 =
cacheServerMXBean.showClientStats(clientIds[0]);
- ClientHealthStatus clientStatus2 =
cacheServerMXBean.showClientStats(clientIds[1]);
- assertThat(clientStatus1).isNotNull();
- assertThat(clientStatus2).isNotNull();
-
- assertThat(clientStatuses).isNotNull().hasSize(2);
-
- DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean();
- assertThat(dsBean.getNumClients()).isEqualTo(2);
- assertThat(dsBean.getNumSubscriptions()).isEqualTo(numSubscriptions);
- }
+ ClientHealthStatus[] clientStatuses = bean.showAllClientStats();
+ assertThat(clientStatuses).isNotNull().hasSize(2);
- /**
- * Invoked in serverVM
- */
- private void verifyStats(final int serverPort) throws Exception {
- ManagementService service = this.managementTestRule.getManagementService();
- CacheServerMXBean cacheServerMXBean =
service.getLocalCacheServerMXBean(serverPort);
+ ClientHealthStatus clientStatus1 = bean.showClientStats(clientIds[0]);
+ ClientHealthStatus clientStatus2 = bean.showClientStats(clientIds[1]);
+ assertThat(clientStatus1).isNotNull();
+ assertThat(clientStatus2).isNotNull();
- CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance();
- CacheClientProxy clientProxy =
clientNotifier.getClientProxies().iterator().next();
-
assertThat(clientProxy.getQueueSizeStat()).isEqualTo(clientProxy.getQueueSize());
-
- ClientQueueDetail queueDetails =
cacheServerMXBean.showClientQueueDetails()[0];
- assertThat((int)
queueDetails.getQueueSize()).isEqualTo(clientProxy.getQueueSizeStat());
- }
-
- private CacheServerMXBean awaitCacheServerMXBean(final DistributedMember
serverMember,
- final int port) {
- SystemManagementService service =
this.managementTestRule.getSystemManagementService();
- ObjectName objectName = service.getCacheServerMBeanName(port,
serverMember);
-
- await().until(
- () -> assertThat(service.getMBeanProxy(objectName,
CacheServerMXBean.class)).isNotNull());
-
- return service.getMBeanProxy(objectName, CacheServerMXBean.class);
- }
-
- private ConditionFactory await() {
- return Awaitility.await().atMost(2, MINUTES);
+ DistributedSystemMXBean dsBean =
+
ClusterStartupRule.memberStarter.getManagementService().getDistributedSystemMXBean();
+ assertThat(dsBean.getNumClients()).isEqualTo(2);
+ assertThat(dsBean.getNumSubscriptions()).isEqualTo(subscriptionCount);
+ });
}
}
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java
b/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java
index e5fc1be..234d90e 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java
@@ -284,25 +284,23 @@ public class ClusterStartupRule extends ExternalResource
implements Serializable
props.setProperty(UserPasswordAuthInit.PASSWORD, password);
props.setProperty(SECURITY_CLIENT_AUTH_INIT,
UserPasswordAuthInit.class.getName());
- SerializableConsumerIF<ClientCacheFactory> consumer = ((cacheFactory) -> {
- cacheFactory.setPoolSubscriptionEnabled(subscriptionEnabled);
+ return startClientVM(index, props, (ccf) -> {
+ ccf.setPoolSubscriptionEnabled(subscriptionEnabled);
for (int serverPort : serverPorts) {
- cacheFactory.addPoolServer("localhost", serverPort);
+ ccf.addPoolServer("localhost", serverPort);
}
});
- return startClientVM(index, props, consumer);
}
// convenient startClientMethod
public ClientVM startClientVM(int index, boolean subscriptionEnabled, int...
serverPorts)
throws Exception {
- SerializableConsumerIF<ClientCacheFactory> consumer = ((cacheFactory) -> {
- cacheFactory.setPoolSubscriptionEnabled(subscriptionEnabled);
- for (int serverPort : serverPorts) {
- cacheFactory.addPoolServer("localhost", serverPort);
+ return startClientVM(index, ccf -> {
+ ccf.setPoolSubscriptionEnabled(subscriptionEnabled);
+ for (int port : serverPorts) {
+ ccf.addPoolServer("localhost", port);
}
});
- return startClientVM(index, consumer);
}
/**
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/MemberVM.java
b/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/MemberVM.java
index d7d4eae..1fe04c8 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/MemberVM.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/MemberVM.java
@@ -143,6 +143,9 @@ public class MemberVM extends VMProvider implements Member {
}
+ /**
+ * this can only be called on a locator (or a vm that is not that serverName)
+ */
public void waitTillClientsAreReadyOnServers(String serverName, int
serverPort, int clientCount) {
vm.invoke(() ->
ClusterStartupRule.memberStarter.waitTillClientsAreReadyOnServer(serverName,
serverPort, clientCount));
@@ -166,4 +169,8 @@ public class MemberVM extends VMProvider implements Member {
.waitUntilGatewaySendersAreReadyOnExactlyThisManyServers(expectedGatewayObjectCount));
}
+ public void waitTillCacheClientProxyHasBeenPaused() {
+ vm.invoke(() ->
ClusterStartupRule.memberStarter.waitTillCacheClientProxyHasBeenPaused());
+ }
+
}
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/MemberStarterRuleTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/MemberStarterRuleTest.java
index 018c5e4..05853ae 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/MemberStarterRuleTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/MemberStarterRuleTest.java
@@ -110,4 +110,13 @@ public class MemberStarterRuleTest {
assertThat(locator.getWorkingDir()).isNotNull();
}
+
+ @Test
+ public void httpPort() {
+ locator = new LocatorStarterRule().withoutHttpService();
+ locator.before();
+
+ assertThat(locator.getHttpPort()).isEqualTo(0);
+ assertThat(locator.getJmxPort()).isGreaterThan(0);
+ }
}
diff --git
a/geode-core/src/integrationTest/java/org/apache/geode/test/junit/rules/LocatorStarterRule.java
b/geode-core/src/integrationTest/java/org/apache/geode/test/junit/rules/LocatorStarterRule.java
index 10e9a7a..08e6fa4 100644
---
a/geode-core/src/integrationTest/java/org/apache/geode/test/junit/rules/LocatorStarterRule.java
+++
b/geode-core/src/integrationTest/java/org/apache/geode/test/junit/rules/LocatorStarterRule.java
@@ -14,6 +14,8 @@
*/
package org.apache.geode.test.junit.rules;
+import static
org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
+import static
org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT;
import static org.apache.geode.distributed.Locator.startLocatorAndDS;
import static org.junit.Assert.assertTrue;
@@ -42,6 +44,12 @@ import org.apache.geode.internal.cache.InternalCache;
* information, working dir, name, and the InternalLocator it creates.
*
* <p>
+ * by default the rule starts a locator with jmx and http service and cluster
configuration service
+ * you can turn off the http service and cluster configuration service to have
your test
+ * run faster if your test does not need them.
+ * </p>
+ *
+ * <p>
* If you need a rule to start a server/locator in different VMs for
Distributed tests, You should
* use {@code LocatorServerStartupRule}.
*/
@@ -72,8 +80,6 @@ public class LocatorStarterRule extends
MemberStarterRule<LocatorStarterRule> im
}
}
-
-
public void startLocator() {
try {
// this will start a jmx manager and admin rest service by default
@@ -95,6 +101,16 @@ public class LocatorStarterRule extends
MemberStarterRule<LocatorStarterRule> im
}
}
+ public LocatorStarterRule withoutHttpService() {
+ properties.put(HTTP_SERVICE_PORT, "0");
+ return this;
+ }
+
+ public LocatorStarterRule withoutClusterConfigurationService() {
+ properties.put(ENABLE_CLUSTER_CONFIGURATION, "false");
+ return this;
+ }
+
@Override
public InternalCache getCache() {
return locator.getCache();
diff --git
a/geode-core/src/integrationTest/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
b/geode-core/src/integrationTest/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
index 11b4904..953429d 100644
---
a/geode-core/src/integrationTest/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
+++
b/geode-core/src/integrationTest/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
@@ -35,6 +35,7 @@ import java.io.File;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
@@ -56,6 +57,8 @@ import
org.apache.geode.distributed.internal.InternalDistributedSystem;
import
org.apache.geode.distributed.internal.membership.gms.MembershipManagerHelper;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.management.CacheServerMXBean;
import org.apache.geode.management.DistributedRegionMXBean;
@@ -192,8 +195,11 @@ public abstract class MemberStarterRule<T> extends
SerializableExternalResource
}
public T withName(String name) {
- this.name = name;
- properties.putIfAbsent(NAME, name);
+ // only if name is not defined yet
+ if (!properties.containsKey(NAME)) {
+ this.name = name;
+ properties.putIfAbsent(NAME, name);
+ }
return (T) this;
}
@@ -316,6 +322,23 @@ public abstract class MemberStarterRule<T> extends
SerializableExternalResource
await().atMost(1, TimeUnit.MINUTES).until(() -> bean.getClientIds().length
== clientCount);
}
+ /**
+ * Invoked in serverVM
+ */
+ public void waitTillCacheClientProxyHasBeenPaused() {
+ await().until(() -> {
+ CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance();
+ Collection<CacheClientProxy> clientProxies =
clientNotifier.getClientProxies();
+
+ for (CacheClientProxy clientProxy : clientProxies) {
+ if (clientProxy.isPaused()) {
+ return true;
+ }
+ }
+ return false;
+ });
+ }
+
public void waitTillCacheServerIsReady(String serverName, int serverPort) {
await().atMost(1, TimeUnit.MINUTES)
.until(() -> getCacheServerMXBean(serverName, serverPort) != null);