http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9cf7ea2c/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java index 1cb23a1..9c080f3 100644 --- a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java +++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java @@ -81,25 +81,25 @@ import com.gemstone.gemfire.test.dunit.WaitCriterion; * This class tests the ContiunousQuery mechanism in GemFire. * It does so by creating a cache server with a cache and a pre-defined region and * a data loader. The client creates the same region and attaches the connection pool. - * + * * * @author anil */ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { - + /** The port on which the bridge server was started in this VM */ private static int bridgeServerPort; - + protected static int port = 0; protected static int port2 = 0; - + public static int noTest = -1; - + public final String[] regions = new String[] { "regionA", "regionB" }; - + private final static int CREATE = 0; private final static int UPDATE = 1; private final static int DESTROY = 2; @@ -113,20 +113,20 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { static private final String WAIT_PROPERTY = "CqQueryTest.maxWaitTime"; static private final int WAIT_DEFAULT = (20 * 1000); - - public static final long MAX_TIME = Integer.getInteger(WAIT_PROPERTY, + + public static final long MAX_TIME = Integer.getInteger(WAIT_PROPERTY, WAIT_DEFAULT).intValue(); public final String[] cqs = new String [] { //0 - Test for ">" "SELECT ALL * FROM /root/" + regions[0] + " p where p.ID > 0", - + //1 - Test for "=" and "and". "SELECT ALL * FROM /root/" + regions[0] + " p where p.ID = 2 and p.status='active'", - + //2 - Test for "<" and "and". "SELECT ALL * FROM /root/" + regions[1] + " p where p.ID < 5 and p.status='active'", - + // FOLLOWING CQS ARE NOT TESTED WITH VALUES; THEY ARE USED TO TEST PARSING LOGIC WITHIN CQ. //3 "SELECT * FROM /root/" + regions[0] + " ;", @@ -148,21 +148,21 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { "SELECT ALL * FROM /root/" + regions[0] + " p where p.ID > 0 and p.status='active'", //11 - Test for "No Alias" "SELECT ALL * FROM /root/" + regions[0] + " where ID > 0", - + }; - + private String[] invalidCQs = new String [] { // Test for ">" "SELECT ALL * FROM /root/invalidRegion p where p.ID > 0" }; - + public CqQueryUsingPoolDUnitTest(String name) { super(name); } - + public void setUp() throws Exception { super.setUp(); - + //We're seeing this on the server when the client //disconnects. IgnoredException.addIgnoredException("Connection reset"); @@ -178,14 +178,14 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { getSystem(); } }); - + } - + /* Returns Cache Server Port */ static int getCacheServerPort() { return bridgeServerPort; } - + /* Create Cache Server */ public void createServer(VM server) { @@ -201,7 +201,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { MirrorType mirrorType = MirrorType.KEYS_VALUES; createServer(server, thePort, eviction, mirrorType); } - + public void createServer(VM server, final int thePort, final boolean eviction, final MirrorType mirrorType) { SerializableRunnable createServer = new CacheSerializableRunnable( @@ -223,7 +223,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { for (int i = 0; i < regions.length; i++) { createRegion(regions[i], factory.createRegionAttributes()); } - + try { startBridgeServer(thePort, true); } @@ -231,13 +231,13 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { catch (Exception ex) { Assert.fail("While starting CacheServer", ex); } - + } }; server.invoke(createServer); } - + /** * Create a bridge server with partitioned region. * @param server VM where to create the bridge server. @@ -255,7 +255,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { //AttributesFactory factory = new AttributesFactory(); //factory.setScope(Scope.DISTRIBUTED_ACK); //factory.setMirrorType(MirrorType.KEYS_VALUES); - + //int maxMem = 0; AttributesFactory attr = new AttributesFactory(); //attr.setValueConstraint(valueConstraint); @@ -265,7 +265,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } PartitionAttributes prAttr = paf.setTotalNumBuckets(197).setRedundantCopies(redundantCopies).create(); attr.setPartitionAttributes(prAttr); - + assertFalse(getSystem().isLoner()); //assertTrue(getSystem().getDistributionManager().getOtherDistributionManagerIds().size() > 0); for (int i = 0; i < regions.length; i++) { @@ -278,14 +278,14 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { catch (Exception ex) { Assert.fail("While starting CacheServer", ex); } - + } }; server.invoke(createServer); } - - + + /* Close Cache Server */ public void closeServer(VM server) { server.invoke(new SerializableRunnable("Close CacheServer") { @@ -295,15 +295,15 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } }); } - + /* Create Client */ public void createClient(VM client, final int serverPort, final String serverHost) { int[] serverPorts = new int[] {serverPort}; - createClient(client, serverPorts, serverHost, null, null); + createClient(client, serverPorts, serverHost, null, null); } - + /* Create Client */ - public void createClient(VM client, final int[] serverPorts, final String serverHost, final String redundancyLevel, + public void createClient(VM client, final int[] serverPorts, final String serverHost, final String redundancyLevel, final String poolName) { SerializableRunnable createQService = new CacheSerializableRunnable("Create Client") { @@ -317,10 +317,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } catch (Exception cqe) { Assert.fail("Failed to getCQService.", cqe); } - + AttributesFactory regionFactory = new AttributesFactory(); regionFactory.setScope(Scope.LOCAL); - + if (poolName != null) { regionFactory.setPoolName(poolName); } else { @@ -330,14 +330,14 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { ClientServerTestCase.configureConnectionPool(regionFactory, serverHost,serverPorts, true, -1, -1, null); } } - for (int i=0; i < regions.length; i++) { + for (int i=0; i < regions.length; i++) { createRegion(regions[i], regionFactory.createRegionAttributes()); LogWriterUtils.getLogWriter().info("### Successfully Created Region on Client :" + regions[i]); //region1.getAttributesMutator().setCacheListener(new CqListener()); } } }; - + client.invoke(createQService); } @@ -353,10 +353,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } catch (Exception ex) { LogWriterUtils.getLogWriter().info("### Failed to get CqService during ClientClose() ###"); } - + } }; - + client.invoke(closeCQService); Wait.pause(1000); } @@ -417,13 +417,13 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } LogWriterUtils.getLogWriter().info("### Number of Entries In Region after Delete :" + region1.keys().size()); } - + }); } - + /** * support for invalidating values. - */ + */ public void invalidateValues(VM vm, final String regionName, final int size) { vm.invoke(new CacheSerializableRunnable("Create values") { public void run2() throws CacheException { @@ -433,41 +433,41 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } LogWriterUtils.getLogWriter().info("### Number of Entries In Region after Delete :" + region1.keys().size()); } - + }); } public void createPool(VM vm, String poolName, String server, int port) { - createPool(vm, poolName, new String[]{server}, new int[]{port}); + createPool(vm, poolName, new String[]{server}, new int[]{port}); } public void createPool(VM vm, final String poolName, final String[] servers, final int[] ports) { createPool(vm, poolName, servers, ports, null); } - + public void createPool(VM vm, final String poolName, final String[] servers, final int[] ports, final String redundancyLevel) { vm.invoke(new CacheSerializableRunnable("createPool :" + poolName) { public void run2() throws CacheException { // Create Cache. getCache(); IgnoredException.addIgnoredException("java.net.ConnectException"); - + PoolFactory cpf = PoolManager.createFactory(); cpf.setSubscriptionEnabled(true); if (redundancyLevel != null){ int redundancy = Integer.parseInt(redundancyLevel); cpf.setSubscriptionRedundancy(redundancy); - } - + } + for (int i=0; i < servers.length; i++){ LogWriterUtils.getLogWriter().info("### Adding to Pool. ### Server : " + servers[i] + " Port : " + ports[i]); cpf.addServer(servers[i], ports[i]); } - + cpf.create(poolName); } - }); + }); } /* Register CQs */ @@ -477,7 +477,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { //pause(60 * 1000); //getLogWriter().info("### DEBUG CREATE CQ START ####"); //pause(20 * 1000); - + LogWriterUtils.getLogWriter().info("### Create CQ. ###" + cqName); // Get CQ Service. QueryService qService = null; @@ -490,10 +490,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { CqAttributesFactory cqf = new CqAttributesFactory(); CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())}; ((CqQueryTestListener)cqListeners[0]).cqName = cqName; - + cqf.initCqListeners(cqListeners); CqAttributes cqa = cqf.create(); - + // Create CQ. try { CqQuery cq1 = qService.newCq(cqName, queryStr, cqa); @@ -505,7 +505,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { throw err; } } - }); + }); } // REMOVE.......... @@ -515,7 +515,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { //pause(60 * 1000); //getLogWriter().info("### DEBUG CREATE CQ START ####"); //pause(20 * 1000); - + LogWriterUtils.getLogWriter().info("### Create CQ. ###" + cqName); // Get CQ Service. QueryService qService = null; @@ -528,10 +528,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { CqAttributesFactory cqf = new CqAttributesFactory(); CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())}; ((CqQueryTestListener)cqListeners[0]).cqName = cqName; - + cqf.initCqListeners(cqListeners); CqAttributes cqa = cqf.create(); - + // Create CQ. try { CqQuery cq1 = qService.newCq(cqName, queryStr, cqa); @@ -543,7 +543,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { throw err; } } - }); + }); } /* Register CQs with no name, execute, and close*/ @@ -553,7 +553,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { //pause(60 * 1000); LogWriterUtils.getLogWriter().info("### DEBUG CREATE CQ START ####"); //pause(20 * 1000); - + LogWriterUtils.getLogWriter().info("### Create CQ with no name. ###"); // Get CQ Service. QueryService qService = null; @@ -565,16 +565,16 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } catch (Exception cqe) { Assert.fail("Failed to getCQService.", cqe); } - + SelectResults cqResults = null; for (int i = 0; i < 20; ++i) { // Create CQ Attributes. CqAttributesFactory cqf = new CqAttributesFactory(); CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())}; - + cqf.initCqListeners(cqListeners); CqAttributes cqa = cqf.create(); - + // Create CQ with no name and execute with initial results. try { cq1 = qService.newCq(queryStr, cqa); @@ -583,7 +583,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { LogWriterUtils.getLogWriter().info("CQService is :" + qService); Assert.fail("Failed to create CQ with no name" + " . ", ex); } - + if (cq1 == null) { LogWriterUtils.getLogWriter().info("Failed to get CqQuery object for CQ with no name."); } @@ -597,7 +597,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { cqResults = cq1.executeWithInitialResults(); } catch (Exception ex){ LogWriterUtils.getLogWriter().info("CqService is :" + qService); - Assert.fail("Failed to execute CQ with initial results, cq name: " + Assert.fail("Failed to execute CQ with initial results, cq name: " + cqName + " . ", ex); } LogWriterUtils.getLogWriter().info("initial result size = " + cqResults.size()); @@ -614,7 +614,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { LogWriterUtils.getLogWriter().info("CQ state after execute = " + cq1.getState()); assertTrue("execute() state mismatch", cq1.getState().isRunning()); } - + //Close the CQ try { cq1.close(); @@ -625,20 +625,20 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { assertTrue("closeCq() state mismatch", cq1.getState().isClosed()); } } - }); + }); } - + public void executeCQ(VM vm, final String cqName, final boolean initialResults, String expectedErr) { executeCQ(vm, cqName, initialResults, noTest, null, expectedErr); - } - + } + /** * Execute/register CQ as running. * @param initialResults true if initialResults are requested * @param expectedResultsSize if >= 0, validate results against this size * @param expectedErr if not null, an error we expect - */ + */ public void executeCQ(VM vm, final String cqName, final boolean initialResults, final int expectedResultsSize, @@ -650,7 +650,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { //pause(60 * 1000); LogWriterUtils.getLogWriter().info("### DEBUG EXECUTE CQ START ####"); //pause(20 * 1000); - + // Get CQ Service. QueryService cqService = null; CqQuery cq1 = null; @@ -663,7 +663,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { // throw err; // fail("Failed to getCQService."); // } - + // Get CqQuery object. try { cq1 = cqService.getCq(cqName); @@ -682,10 +682,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { err.initCause(ex); throw err; } - + if (initialResults) { SelectResults cqResults = null; - + try { cqResults = cq1.executeWithInitialResults(); } catch (Exception ex){ @@ -698,11 +698,11 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { LogWriterUtils.getLogWriter().info("initial result size = " + cqResults.size()); assertTrue("executeWithInitialResults() state mismatch", cq1.getState().isRunning()); if (expectedResultsSize >= 0) { - assertEquals("Unexpected results size for CQ: " + cqName + - " CQ Query :" + cq1.getQueryString(), + assertEquals("Unexpected results size for CQ: " + cqName + + " CQ Query :" + cq1.getQueryString(), expectedResultsSize, cqResults.size()); } - + if (expectedKeys != null) { HashSet resultKeys = new HashSet(); for (Object o : cqResults.asList()) { @@ -710,14 +710,14 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { resultKeys.add(s.get("key")); } for (int i =0; i < expectedKeys.length; i++){ - assertTrue("Expected key :" + expectedKeys[i] + - " Not found in CqResults for CQ: " + cqName + - " CQ Query :" + cq1.getQueryString() + + assertTrue("Expected key :" + expectedKeys[i] + + " Not found in CqResults for CQ: " + cqName + + " CQ Query :" + cq1.getQueryString() + " Keys in CqResults :" + resultKeys, resultKeys.contains(expectedKeys[i])); } } - } + } else { try { cq1.execute(); @@ -732,7 +732,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { assertTrue("execute() state mismatch", cq1.getState().isRunning()); } } - + public void run2() throws CacheException { if (expectedErr != null) { getCache().getLogger().info("<ExpectedException action=add>" @@ -740,7 +740,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } try { work(); - } + } finally { if (expectedErr != null) { getCache().getLogger().info("<ExpectedException action=remove>" @@ -748,9 +748,9 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } } } - }); + }); } - + /* Stop/pause CQ */ public void stopCQ(VM vm, final String cqName) throws Exception { vm.invoke(new CacheSerializableRunnable("Stop CQ :" + cqName) { @@ -763,7 +763,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } catch (Exception cqe) { Assert.fail("Failed to getCQService.", cqe); } - + // Stop CQ. CqQuery cq1 = null; try { @@ -776,7 +776,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } }); } - + // Stop and execute CQ repeatedly /* Stop/pause CQ */ private void stopExecCQ(VM vm, final String cqName, final int count) throws Exception { @@ -791,14 +791,14 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } catch (Exception cqe) { Assert.fail("Failed to getCqService.", cqe); } - + // Get CQ. try { cq1 = cqService.getCq(cqName); } catch (Exception ex){ Assert.fail("Failed to get CQ " + cqName + " . ", ex); } - + for (int i = 0; i < count; ++i) { // Stop CQ. try { @@ -809,7 +809,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { assertTrue("Stop CQ state mismatch, count = " + i, cq1.getState().isStopped()); LogWriterUtils.getLogWriter().info("After stop in Stop and Execute loop, ran successfully, loop count: " + i); LogWriterUtils.getLogWriter().info("CQ state: " + cq1.getState()); - + // Re-execute CQ try { cq1.execute(); @@ -823,8 +823,8 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } }); } - - + + /* UnRegister CQs */ public void closeCQ(VM vm, final String cqName) throws Exception { vm.invoke(new CacheSerializableRunnable("Close CQ :" + cqName) { @@ -837,7 +837,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } catch (Exception cqe) { Assert.fail("Failed to getCqService.", cqe); } - + // Close CQ. CqQuery cq1 = null; try { @@ -850,12 +850,12 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } }); } - + /* Register CQs */ public void registerInterestListCQ(VM vm, final String regionName, final int keySize, final boolean all) { vm.invoke(new CacheSerializableRunnable("Register InterestList and CQ") { public void run2() throws CacheException { - + // Get CQ Service. Region region = null; try { @@ -867,7 +867,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { throw err; } - + try { if (all) { region.registerInterest("ALL_KEYS"); @@ -884,22 +884,22 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { throw err; } } - }); + }); } - + /* Validate CQ Count */ public void validateCQCount(VM vm, final int cqCnt) throws Exception { vm.invoke(new CacheSerializableRunnable("validate cq count") { public void run2() throws CacheException { // Get CQ Service. - + QueryService cqService = null; try { cqService = getCache().getQueryService(); } catch (Exception cqe) { Assert.fail("Failed to getCQService.", cqe); } - + int numCqs = 0; try { numCqs = cqService.getCqs().length; @@ -910,9 +910,9 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } }); } - - - /** + + + /** * Throws AssertionError if the CQ can be found or if any other * error occurs */ @@ -922,12 +922,12 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { LogWriterUtils.getLogWriter().info("### Fail if CQ Exists. ### " + cqName); // Get CQ Service. QueryService cqService = null; - try { + try { cqService = getCache().getQueryService(); } catch (Exception cqe) { Assert.fail("Failed to getCQService.", cqe); } - + CqQuery cQuery = cqService.getCq(cqName); if (cQuery != null) { fail("Unexpectedly found CqQuery for CQ : " + cqName); @@ -935,31 +935,31 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } }); } - + private void validateCQError(VM vm, final String cqName, final int numError) { vm.invoke(new CacheSerializableRunnable("Validate CQs") { public void run2() throws CacheException { - + LogWriterUtils.getLogWriter().info("### Validating CQ. ### " + cqName); // Get CQ Service. QueryService cqService = null; - try { + try { cqService = getCache().getQueryService(); } catch (Exception cqe) { Assert.fail("Failed to getCQService.", cqe); } - + CqQuery cQuery = cqService.getCq(cqName); if (cQuery == null) { fail("Failed to get CqQuery for CQ : " + cqName); } - + CqAttributes cqAttr = cQuery.getCqAttributes(); CqListener cqListener = cqAttr.getCqListener(); CqQueryTestListener listener = (CqQueryTestListener) cqListener; listener.printInfo(false); - + // Check for totalEvents count. if (numError != noTest) { // Result size validation. @@ -969,7 +969,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } }); } - + public void validateCQ(VM vm, final String cqName, final int resultSize, final int creates, @@ -978,7 +978,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { validateCQ(vm, cqName, resultSize, creates, updates, deletes, noTest, noTest, noTest, noTest); } - + public void validateCQ(VM vm, final String cqName, final int resultSize, final int creates, @@ -993,29 +993,29 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { LogWriterUtils.getLogWriter().info("### Validating CQ. ### " + cqName); // Get CQ Service. QueryService cqService = null; - try { + try { cqService = getCache().getQueryService(); } catch (Exception cqe) { Assert.fail("Failed to getCQService.", cqe); } - + CqQuery cQuery = cqService.getCq(cqName); if (cQuery == null) { fail("Failed to get CqQuery for CQ : " + cqName); } - + CqAttributes cqAttr = cQuery.getCqAttributes(); CqListener cqListeners[] = cqAttr.getCqListeners(); CqQueryTestListener listener = (CqQueryTestListener) cqListeners[0]; listener.printInfo(false); - + // Check for totalEvents count. if (totalEvents != noTest) { // Result size validation. listener.printInfo(true); assertEquals("Total Event Count mismatch", totalEvents, listener.getTotalEventCount()); } - + if (resultSize != noTest) { //SelectResults results = cQuery.getCqResults(); //getLogWriter().info("### CQ Result Size is :" + results.size()); @@ -1025,72 +1025,72 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { fail("test for event counts instead of results size"); // assertEquals("Result Size mismatch", resultSize, listener.getTotalEventCount()); } - + // Check for create count. if (creates != noTest) { // Result size validation. listener.printInfo(true); assertEquals("Create Event mismatch", creates, listener.getCreateEventCount()); } - + // Check for update count. if (updates != noTest) { // Result size validation. listener.printInfo(true); assertEquals("Update Event mismatch", updates, listener.getUpdateEventCount()); } - + // Check for delete count. if (deletes != noTest) { // Result size validation. listener.printInfo(true); assertEquals("Delete Event mismatch", deletes, listener.getDeleteEventCount()); } - + // Check for queryInsert count. if (queryInserts != noTest) { // Result size validation. listener.printInfo(true); assertEquals("Query Insert Event mismatch", queryInserts, listener.getQueryInsertEventCount()); } - + // Check for queryUpdate count. if (queryUpdates != noTest) { // Result size validation. listener.printInfo(true); assertEquals("Query Update Event mismatch", queryUpdates, listener.getQueryUpdateEventCount()); } - + // Check for queryDelete count. if (queryDeletes != noTest) { // Result size validation. listener.printInfo(true); assertEquals("Query Delete Event mismatch", queryDeletes, listener.getQueryDeleteEventCount()); - } + } } }); } - + public void waitForCreated(VM vm, final String cqName, final String key){ waitForEvent(vm, 0, cqName, key); } - + public void waitForUpdated(VM vm, final String cqName, final String key){ waitForEvent(vm, 1, cqName, key); } - + public void waitForDestroyed(VM vm, final String cqName, final String key){ waitForEvent(vm, 2, cqName, key); } - + public void waitForInvalidated(VM vm, final String cqName, final String key){ waitForEvent(vm, 3, cqName, key); } - + public void waitForClose(VM vm, final String cqName){ waitForEvent(vm, 4, cqName, null); } - + public void waitForRegionClear(VM vm, final String cqName){ waitForEvent(vm, 5, cqName, null); } @@ -1109,53 +1109,53 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } catch (Exception cqe) { Assert.fail("Failed to getCQService.", cqe); } - + CqQuery cQuery = cqService.getCq(cqName); if (cQuery == null) { fail("Failed to get CqQuery for CQ : " + cqName); } - + CqAttributes cqAttr = cQuery.getCqAttributes(); CqListener[] cqListener = cqAttr.getCqListeners(); CqQueryTestListener listener = (CqQueryTestListener) cqListener[0]; - + switch (event) { case CREATE : listener.waitForCreated(key); break; - + case UPDATE : listener.waitForUpdated(key); break; - + case DESTROY : listener.waitForDestroyed(key); - break; - + break; + case INVALIDATE : listener.waitForInvalidated(key); - break; - + break; + case CLOSE : listener.waitForClose(); - break; + break; case REGION_CLEAR : listener.waitForRegionClear(); - break; + break; case REGION_INVALIDATE : listener.waitForRegionInvalidate(); - break; + break; } } }); } - + /** * Waits till the CQ state is same as the expected. - * Waits for max time, if the CQ state is not same as expected + * Waits for max time, if the CQ state is not same as expected * throws exception. */ public void waitForCqState(VM vm, final String cqName, final int state) { @@ -1179,9 +1179,9 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { // Wait max time, till the CQ state is as expected. final long start = System.currentTimeMillis(); while (cqState.getState() != state) { - junit.framework.Assert.assertTrue("Waited over " + MAX_TIME + junit.framework.Assert.assertTrue("Waited over " + MAX_TIME + "ms for Cq State to be changed to " + state - + "; consider raising " + WAIT_PROPERTY, + + "; consider raising " + WAIT_PROPERTY, (System.currentTimeMillis() - start) < MAX_TIME); Wait.pause(100); } @@ -1193,33 +1193,33 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { vm.invoke(new CacheSerializableRunnable("validate cq count") { public void run2() throws CacheException { // Get CQ Service. - + QueryService cqService = null; try { cqService = getCache().getQueryService(); } catch (Exception cqe) { Assert.fail("Failed to getCQService.", cqe); } - + CqQuery cQuery = cqService.getCq(cqName); if (cQuery == null) { fail("Failed to get CqQuery for CQ : " + cqName); } - + CqAttributes cqAttr = cQuery.getCqAttributes(); CqListener cqListener = cqAttr.getCqListener(); CqQueryTestListener listener = (CqQueryTestListener) cqListener; - listener.getEventHistory(); + listener.getEventHistory(); } }); } - + public void validateQuery(VM vm, final String query, final int resultSize) { vm.invoke(new CacheSerializableRunnable("Validate Query") { public void run2() throws CacheException { LogWriterUtils.getLogWriter().info("### Validating Query. ###"); QueryService qs = getCache().getQueryService(); - + Query q = qs.newQuery(query); try { Object r = q.execute(); @@ -1235,9 +1235,9 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } }); } - + private Properties getConnectionProps(String[] hosts, int[] ports, Properties newProps) { - + Properties props = new Properties(); String endPoints = ""; String host = hosts[0]; @@ -1252,13 +1252,13 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { endPoints = endPoints + ","; } } - + props.setProperty("endpoints", endPoints); props.setProperty("retryAttempts", "1"); //props.setProperty("establishCallbackConnection", "true"); //props.setProperty("LBPolicy", "Sticky"); //props.setProperty("readTimeout", "120000"); - + // Add other property elements. if (newProps != null) { Enumeration e = newProps.keys(); @@ -1269,8 +1269,8 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } return props; } - - + + // Exercise CQ attributes mutator functions private void mutateCQAttributes(VM vm, final String cqName, final int mutator_function) throws Exception { vm.invoke(new CacheSerializableRunnable("Stop CQ :" + cqName) { @@ -1284,7 +1284,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } catch (Exception cqe) { Assert.fail("Failed to getCQService.", cqe); } - + // Get CQ. try { cq1 = cqService.getCq(cqName); @@ -1297,30 +1297,30 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { switch (mutator_function) { case CREATE: // Reinitialize with 2 CQ Listeners - CqListener cqListenersArray[] = {new CqQueryTestListener(getCache().getLogger()), + CqListener cqListenersArray[] = {new CqQueryTestListener(getCache().getLogger()), new CqQueryTestListener(getCache().getLogger())}; cqAttrMutator.initCqListeners(cqListenersArray); cqListeners = cqAttr.getCqListeners(); assertEquals("CqListener count mismatch", cqListeners.length, 2); break; - + case UPDATE: // Add 2 new CQ Listeners CqListener newListener1 = new CqQueryTestListener(getCache().getLogger()); CqListener newListener2 = new CqQueryTestListener(getCache().getLogger()); cqAttrMutator.addCqListener(newListener1); cqAttrMutator.addCqListener(newListener2); - + cqListeners = cqAttr.getCqListeners(); assertEquals("CqListener count mismatch", cqListeners.length, 3); break; - + case DESTROY: cqListeners = cqAttr.getCqListeners(); cqAttrMutator.removeCqListener(cqListeners[0]); cqListeners = cqAttr.getCqListeners(); assertEquals("CqListener count mismatch", cqListeners.length, 2); - + // Remove a listener and validate cqAttrMutator.removeCqListener(cqListeners[0]); cqListeners = cqAttr.getCqListeners(); @@ -1330,10 +1330,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } }); } - - - - + + + + /** * Test for InterestList and CQ registered from same clients. * @throws Exception @@ -1347,30 +1347,30 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { createServer(server); final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); final String host0 = NetworkUtils.getServerHostName(server.getHost()); - + String poolName = "testInterestListAndCQs"; createPool(client, poolName, host0, thePort); - + createClient(client, thePort, host0); /* Create CQs. */ - createCQ(client, poolName, "testInterestListAndCQs_0", cqs[0]); + createCQ(client, poolName, "testInterestListAndCQs_0", cqs[0]); validateCQCount(client, 1); /* Init values at server. */ final int size = 10; - - executeCQ(client, "testInterestListAndCQs_0", false, null); + + executeCQ(client, "testInterestListAndCQs_0", false, null); registerInterestListCQ(client, regions[0], size, false); - + createValues(server, regions[0], size); // Wait for client to Synch. - + for (int i=1; i <=10; i++){ waitForCreated(client, "testInterestListAndCQs_0", KEY + i); } Wait.pause(5 * 1000); - + // validate CQs. validateCQ(client, "testInterestListAndCQs_0", /* resultSize: */ noTest, @@ -1381,17 +1381,17 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size); - - + + // Validate InterestList. // CREATE client.invoke(new CacheSerializableRunnable("validate updates") { public void run2() throws CacheException { final Region region = getRootRegion().getSubregion(regions[0]); assertNotNull(region); - + // Set keys = region.entrySet(); -// assertEquals("Mismatch, number of keys in local region is not equal to the interest list size", +// assertEquals("Mismatch, number of keys in local region is not equal to the interest list size", // size, keys.size()); // TODO does this WaitCriterion actually help? WaitCriterion wc = new WaitCriterion() { @@ -1401,7 +1401,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { if (sz == size) { return true; } - excuse = "Mismatch, number of keys (" + sz + + excuse = "Mismatch, number of keys (" + sz + ") in local region is not equal to the interest list size (" + size + ")"; return false; @@ -1411,7 +1411,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } }; Wait.waitForCriterion(wc, 60 * 1000, 1000, true); - + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); for (int i = 1; i <= 10; i++) { ctl.waitForCreated(KEY+i); @@ -1419,24 +1419,24 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } } }); - + // UPDATE createValues(server, regions[0], size); // Wait for client to Synch. for (int i=1; i <=10; i++){ waitForUpdated(client, "testInterestListAndCQs_0", KEY + i); } - - + + client.invoke(new CacheSerializableRunnable("validate updates") { public void run2() throws CacheException { Region region = getRootRegion().getSubregion(regions[0]); assertNotNull(region); - + Set keys = region.entrySet(); - assertEquals("Mismatch, number of keys in local region is not equal to the interest list size", + assertEquals("Mismatch, number of keys in local region is not equal to the interest list size", size, keys.size()); - + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); for (int i = 1; i <= 10; i++) { ctl.waitForUpdated(KEY+i); @@ -1444,7 +1444,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } } }); - + // INVALIDATE server.invoke(new CacheSerializableRunnable("Invalidate values") { public void run2() throws CacheException { @@ -1454,20 +1454,20 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } } }); - - + + waitForInvalidated(client, "testInterestListAndCQs_0", KEY + 10); - - + + client.invoke(new CacheSerializableRunnable("validate invalidates") { public void run2() throws CacheException { Region region = getRootRegion().getSubregion(regions[0]); assertNotNull(region); - + Set keys = region.entrySet(); - assertEquals("Mismatch, number of keys in local region is not equal to the interest list size", + assertEquals("Mismatch, number of keys in local region is not equal to the interest list size", size, keys.size()); - + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); for (int i = 1; i <= 10; i++) { ctl.waitForInvalidated(KEY+i); @@ -1475,7 +1475,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } } }); - + validateCQ(client, "testInterestListAndCQs_0", /* resultSize: */ noTest, /* creates: */ size, @@ -1485,7 +1485,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ size, /* queryDeletes: */ size, /* totalEvents: */ size * 3); - + // DESTROY - this should not have any effect on CQ, as the events are // already destroyed from invalidate events. server.invoke(new CacheSerializableRunnable("Invalidate values") { @@ -1502,7 +1502,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { public void run2() throws CacheException { Region region = getRootRegion().getSubregion(regions[0]); assertNotNull(region); - + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); for (int i = 1; i <= 10; i++) { ctl.waitForDestroyed(KEY+i); @@ -1523,14 +1523,14 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { closeClient(client); closeServer(server); } - - + + /** * Test for CQ register and UnRegister. * @throws Exception */ public void testCQStopExecute() throws Exception { - + final Host host = Host.getHost(0); VM server = host.getVM(0); VM client = host.getVM(1); @@ -1539,29 +1539,29 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { createServer(server); final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); final String host0 = NetworkUtils.getServerHostName(server.getHost()); - + String poolName = "testCQStopExecute"; createPool(client, poolName, host0, thePort); - + //createClient(client, thePort, host0); /* Create CQs. */ - createCQ(client, poolName, "testCQStopExecute_0", cqs[0]); + createCQ(client, poolName, "testCQStopExecute_0", cqs[0]); validateCQCount(client, 1); - + executeCQ(client, "testCQStopExecute_0", false, null); /* Init values at server. */ int size = 10; createValues(server, regions[0], size); // Wait for client to Synch. - + waitForCreated(client, "testCQStopExecute_0", KEY+size); - - + + // Check if Client and Server in sync. //validateServerClientRegionEntries(server, client, regions[0]); - validateQuery(server, cqs[0], 10); + validateQuery(server, cqs[0], 10); // validate CQs. //validateCQ(client, "testCQStopExecute_0", size, noTest, noTest, noTest); validateCQ(client, "testCQStopExecute_0", @@ -1573,10 +1573,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size); - + // Test CQ stop stopCQ(client, "testCQStopExecute_0"); - + // Test CQ re-enable executeCQ(client, "testCQStopExecute_0", false, null); @@ -1585,10 +1585,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { // Wait for client to Synch. waitForCreated(client, "testCQStopExecute_0", KEY+20); size = 30; - + // Check if Client and Server in sync. //validateServerClientRegionEntries(server, client, regions[0]); - validateQuery(server, cqs[0], 20); + validateQuery(server, cqs[0], 20); // validate CQs. //validateCQ(client, "testCQStopExecute_0", size, noTest, noTest, noTest); validateCQ(client, "testCQStopExecute_0", @@ -1600,19 +1600,19 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ 10, /* queryDeletes: */ 0, /* totalEvents: */ size); - - + + // Stop and execute CQ 20 times stopExecCQ(client, "testCQStopExecute_0", 20); - + // Test CQ Close closeCQ(client, "testCQStopExecute_0"); - + // Close. closeClient(client); closeServer(server); } - + /** * Test for CQ Attributes Mutator functions * @throws Exception @@ -1626,23 +1626,23 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { createServer(server); final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); final String host0 = NetworkUtils.getServerHostName(server.getHost()); - + String poolName = "testCQAttributesMutator"; createPool(client, poolName, host0, thePort); //createClient(client, thePort, host0); /* Create CQs. */ String cqName = new String("testCQAttributesMutator_0"); - createCQ(client, poolName, cqName, cqs[0]); - validateCQCount(client, 1); - executeCQ(client,cqName, false, null); + createCQ(client, poolName, cqName, cqs[0]); + validateCQCount(client, 1); + executeCQ(client,cqName, false, null); /* Init values at server. */ int size = 10; createValues(server, regions[0], size); // Wait for client to Synch. waitForCreated(client, cqName, KEY + size); - + // validate CQs. validateCQ(client, cqName, /* resultSize: */ noTest, @@ -1653,14 +1653,14 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size); - + // Add 2 new CQ Listeners mutateCQAttributes(client, cqName, UPDATE); /* Init values at server. */ createValues(server, regions[0], size * 2); waitForCreated(client, cqName, KEY + (size * 2)); - + validateCQ(client, cqName, /* resultSize: */ noTest, /* creates: */ 20, @@ -1670,10 +1670,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ 10, /* queryDeletes: */ 0, /* totalEvents: */ 30); - + // Remove 2 listeners and validate mutateCQAttributes(client, cqName, DESTROY); - + validateCQ(client, cqName, /* resultSize: */ noTest, /* creates: */ 10, @@ -1683,7 +1683,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ 10, /* queryDeletes: */ 0, /* totalEvents: */ 20); - + // Reinitialize with 2 CQ Listeners mutateCQAttributes(client, cqName, CREATE); @@ -1691,7 +1691,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { deleteValues(server, regions[0], 20); // Wait for client to Synch. waitForDestroyed(client, cqName, KEY + (size * 2)); - + validateCQ(client, cqName, /* resultSize: */ noTest, /* creates: */ 0, @@ -1701,21 +1701,21 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ 0, /* queryDeletes: */ 20, /* totalEvents: */ 20); - + // Close CQ closeCQ(client, cqName); - + // Close. closeClient(client); closeServer(server); } - + /** * Test for CQ register and UnRegister. * @throws Exception */ public void testCQCreateClose() throws Exception { - + final Host host = Host.getHost(0); VM server = host.getVM(0); VM client = host.getVM(1); @@ -1724,7 +1724,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { createServer(server); final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); final String host0 = NetworkUtils.getServerHostName(server.getHost()); - + String poolName = "testCQCreateClose"; System.out.println("##### Pool Name :" + poolName + " host :" + host0 + " port :" + thePort); createPool(client, poolName, host0, thePort); @@ -1737,9 +1737,9 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { //getLogWriter().info("### DEBUG START ####"); /* Create CQs. */ - createCQ(client, poolName, "testCQCreateClose_0", cqs[0]); + createCQ(client, poolName, "testCQCreateClose_0", cqs[0]); validateCQCount(client, 1); - + executeCQ(client, "testCQCreateClose_0", false, null); /* Init values at server. */ @@ -1747,10 +1747,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { createValues(server, regions[0], size); // Wait for client to Synch. waitForCreated(client, "testCQCreateClose_0", KEY+size); - + // Check if Client and Server in sync. //validateServerClientRegionEntries(server, client, regions[0]); - validateQuery(server, cqs[0], 10); + validateQuery(server, cqs[0], 10); // validate CQs. validateCQ(client, "testCQCreateClose_0", /* resultSize: */ noTest, @@ -1761,29 +1761,29 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size); - + // Test CQ stop stopCQ(client, "testCQCreateClose_0"); - + // Test CQ re-enable executeCQ(client, "testCQCreateClose_0", false, null); - + // Test CQ Close closeCQ(client, "testCQCreateClose_0"); - + //Create CQs with no name, execute, and close. // UNCOMMENT.... - createAndExecCQNoName(client, poolName, cqs[0]); - + createAndExecCQNoName(client, poolName, cqs[0]); + // Accessing the closed CQ. failIfCQExists(client, "testCQCreateClose_0"); - + // re-Create the cq which is closed. createCQ(client, poolName, "testCQCreateClose_0", cqs[0]); /* Test CQ Count */ validateCQCount(client, 1); - + // Registering CQ with same name from same client. try { createCQ(client, poolName, "testCQCreateClose_0", cqs[0]); @@ -1795,43 +1795,43 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { assertTrue("Got wrong exception: " + causeCause.getClass().getName(), causeCause instanceof CqExistsException); } - + // Getting values from non-existent CQ. failIfCQExists(client, "testCQCreateClose_NO"); - + // Server Registering CQ. try { createCQ(server, "testCQCreateClose_1", cqs[0]); fail("Trying to create CQ on Cache Server. Should have thrown Exception."); } catch (com.gemstone.gemfire.test.dunit.RMIException rmiExc) { Throwable cause = rmiExc.getCause(); - assertTrue("unexpected cause: " + cause.getClass().getName(), + assertTrue("unexpected cause: " + cause.getClass().getName(), cause instanceof AssertionError); Throwable causeCause = cause.getCause(); // should be a IllegalStateException assertTrue("Got wrong exception: " + causeCause.getClass().getName(), causeCause instanceof IllegalStateException); } - + validateCQCount(client, 1); - + createCQ(client, poolName, "testCQCreateClose_3", cqs[2]); - + validateCQCount(client, 2); /* Test for closeAllCQs() */ - + client.invoke(new CacheSerializableRunnable("CloseAll CQ :") { public void run2() throws CacheException { LogWriterUtils.getLogWriter().info("### Close All CQ. ###"); // Get CQ Service. QueryService cqService = null; - try { + try { cqService = getCache().getQueryService(); } catch (Exception cqe) { LogWriterUtils.getLogWriter().info("Failed to getCQService.", cqe); Assert.fail("Failed to getCQService.", cqe); } - + // Close CQ. try { cqService.closeCqs(); @@ -1841,30 +1841,30 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } } }); - + validateCQCount(client, 0); - + // Initialize. createCQ(client, poolName, "testCQCreateClose_2", cqs[1]); createCQ(client, poolName, "testCQCreateClose_4", cqs[1]); createCQ(client, poolName, "testCQCreateClose_5", cqs[1]); - + // Execute few of the initialized cqs executeCQ(client, "testCQCreateClose_4", false, null); executeCQ(client, "testCQCreateClose_5", false, null); - + // Call close all CQ. client.invoke(new CacheSerializableRunnable("CloseAll CQ 2 :") { public void run2() throws CacheException { LogWriterUtils.getLogWriter().info("### Close All CQ 2. ###"); // Get CQ Service. QueryService cqService = null; - try { + try { cqService = getCache().getQueryService(); } catch (Exception cqe) { Assert.fail("Failed to getCQService.", cqe); } - + // Close CQ. try { cqService.closeCqs(); @@ -1873,12 +1873,12 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } } }); - + // Close. closeClient(client); closeServer(server); } - + /** * This will test the events after region destory. * The CQs on the destroy region needs to be closed. @@ -1893,31 +1893,31 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { createServer(server); final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); final String host0 = NetworkUtils.getServerHostName(server.getHost()); - + String poolName = "testRegionDestroy"; createPool(client, poolName, host0, thePort); - + createClient(client, thePort, host0); /* Create CQs. */ - createCQ(client, poolName, "testRegionDestroy_0", cqs[0]); - createCQ(client, poolName, "testRegionDestroy_1", cqs[0]); - createCQ(client, poolName, "testRegionDestroy_2", cqs[0]); - + createCQ(client, poolName, "testRegionDestroy_0", cqs[0]); + createCQ(client, poolName, "testRegionDestroy_1", cqs[0]); + createCQ(client, poolName, "testRegionDestroy_2", cqs[0]); + executeCQ(client, "testRegionDestroy_0", false, null); executeCQ(client, "testRegionDestroy_1", false, null); executeCQ(client, "testRegionDestroy_2", false, null); /* Init values at server. */ final int size = 10; - registerInterestListCQ(client, regions[0], size, false); + registerInterestListCQ(client, regions[0], size, false); createValues(server, regions[0], size); - + // Wait for client to Synch. - + waitForCreated(client, "testRegionDestroy_0", KEY + 10); - - + + // validate CQs. validateCQ(client, "testRegionDestroy_0", /* resultSize: */ noTest, @@ -1928,7 +1928,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size); - + // Validate InterestList. // CREATE client.invoke(new CacheSerializableRunnable("validate updates") { @@ -1954,11 +1954,11 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } }; Wait.waitForCriterion(wc, 30 * 1000, 250, true); - + Region region = getRootRegion().getSubregion(regions[0]); assertNotNull(region); - - CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) + + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); for (int i = 1; i <= 10; i++) { ctl.waitForCreated(KEY+i); @@ -1966,7 +1966,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } } }); - + // Destroy Region. server.invoke(new CacheSerializableRunnable("Destroy Region") { public void run2() throws CacheException { @@ -1974,20 +1974,20 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { region1.destroyRegion(); } }); - + Wait.pause(2 * 1000); validateCQCount(client, 0); - + closeClient(client); closeServer(server); - + } - + /** * Test for CQ with multiple clients. */ public void testCQWithMultipleClients() throws Exception { - + final Host host = Host.getHost(0); VM server = host.getVM(0); VM client1 = host.getVM(1); @@ -1998,11 +1998,11 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { createServer(server); final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); final String host0 = NetworkUtils.getServerHostName(server.getHost()); - + String poolName1 = "testCQWithMultipleClients1"; String poolName2 = "testCQWithMultipleClients2"; String poolName3 = "testCQWithMultipleClients3"; - + createPool(client1, poolName1, host0, thePort); createPool(client2, poolName2, host0, thePort); @@ -2012,12 +2012,12 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { createCQ(client2, poolName2, "testCQWithMultipleClients_0", cqs[0]); executeCQ(client2, "testCQWithMultipleClients_0", false, null); - + int size = 10; - + // Create Values on Server. createValues(server, regions[0], size); - + waitForCreated(client1, "testCQWithMultipleClients_0", KEY + 10); /* Validate the CQs */ @@ -2030,9 +2030,9 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size); - + waitForCreated(client2, "testCQWithMultipleClients_0", KEY + 10); - + validateCQ(client2, "testCQWithMultipleClients_0", /* resultSize: */ noTest, /* creates: */ size, @@ -2046,7 +2046,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* Close test */ closeCQ(client1, "testCQWithMultipleClients_0"); - + validateCQ(client2, "testCQWithMultipleClients_0", /* resultSize: */ noTest, /* creates: */ size, @@ -2059,19 +2059,19 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* Init new client and create cq */ createPool(client3, poolName3, host0, thePort); - + createCQ(client3, poolName3, "testCQWithMultipleClients_0", cqs[0]); createCQ(client3, poolName3, "testCQWithMultipleClients_1", cqs[1]); executeCQ(client3, "testCQWithMultipleClients_0", false, null); executeCQ(client3, "testCQWithMultipleClients_1", false, null); - + // Update values on Server. This will be updated on new Client CQs. createValues(server, regions[0], size); - - + + waitForUpdated(client3, "testCQWithMultipleClients_0", KEY + 10); - - + + validateCQ(client3, "testCQWithMultipleClients_0", /* resultSize: */ noTest, /* creates: */ 0, @@ -2081,7 +2081,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ size, /* queryDeletes: */ 0, /* totalEvents: */ size); - + validateCQ(client3, "testCQWithMultipleClients_1", /* resultSize: */ noTest, /* creates: */ 0, @@ -2099,17 +2099,17 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* Close Client Test */ closeClient(client1); - + clearCQListenerEvents(client2, "testCQWithMultipleClients_0"); clearCQListenerEvents(client3, "testCQWithMultipleClients_1"); - + // Update values on server, update again. createValues(server, regions[0], size); - - + + waitForUpdated(client2, "testCQWithMultipleClients_0", KEY + 10); - - + + validateCQ(client2, "testCQWithMultipleClients_0", /* resultSize: */ noTest, /* creates: */ size, @@ -2119,9 +2119,9 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ size * 2, /* queryDeletes: */ 0, /* totalEvents: */ size * 3); - + waitForUpdated(client3, "testCQWithMultipleClients_1", KEY + 2); - + validateCQ(client3, "testCQWithMultipleClients_1", /* resultSize: */ noTest, /* creates: */ 0, @@ -2137,24 +2137,24 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { closeClient(client3); closeServer(server); } - + /** * Test for CQ ResultSet. */ public void testCQResultSet() throws Exception { - + final Host host = Host.getHost(0); VM server = host.getVM(0); VM client = host.getVM(1); - + createServer(server); - + final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); final String host0 = NetworkUtils.getServerHostName(server.getHost()); - + String poolName = "testCQResultSet"; createPool(client, poolName, host0, thePort); - + // Create client. // createClient(client, thePort, host0); @@ -2162,10 +2162,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { int size = 10; createValues(server, regions[0], size); Wait.pause(1*500); - + // Create CQs. - createCQ(client, poolName, "testCQResultSet_0", cqs[0]); - + createCQ(client, poolName, "testCQResultSet_0", cqs[0]); + // Check resultSet Size. executeCQ(client, "testCQResultSet_0", true, 10, null, null); @@ -2174,11 +2174,11 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { // Check resultSet Size. executeCQ(client, "testCQResultSet_1", true, 0, null, null); stopCQ(client, "testCQResultSet_1"); - + // Init values. - createValues(server, regions[1], 5); + createValues(server, regions[1], 5); validateQuery(server, cqs[2], 2); - + executeCQ(client, "testCQResultSet_1", true, 2, null, null); /* compare values... @@ -2205,24 +2205,24 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } */ - + // Close. - closeClient(client); + closeClient(client); closeServer(server); } - + /** * Test for CQ Listener events. * */ public void testCQEvents() throws Exception { - + final Host host = Host.getHost(0); VM server = host.getVM(0); VM client = host.getVM(1); - + createServer(server); - + final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); final String host0 = NetworkUtils.getServerHostName(server.getHost()); @@ -2231,18 +2231,18 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { // Create client. //createClient(client, thePort, host0); - + // Create CQs. createCQ(client, poolName, "testCQEvents_0", cqs[0]); - - executeCQ(client, "testCQEvents_0", false, null); - + + executeCQ(client, "testCQEvents_0", false, null); + // Init values at server. int size = 10; createValues(server, regions[0], size); - + waitForCreated(client, "testCQEvents_0", KEY+size); - + // validate Create events. validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, @@ -2253,14 +2253,14 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size); - + // Update values. createValues(server, regions[0], 5); createValues(server, regions[0], 10); - + waitForUpdated(client, "testCQEvents_0", KEY+size); - - + + // validate Update events. validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, @@ -2271,11 +2271,11 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ 15, /* queryDeletes: */ 0, /* totalEvents: */ size + 15); - + // Validate delete events. deleteValues(server, regions[0], 5); waitForDestroyed(client, "testCQEvents_0", KEY+5); - + validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, /* creates: */ size, @@ -2285,7 +2285,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ 15, /* queryDeletes: */ 5, /* totalEvents: */ size + 15 + 5); - + // Insert invalid Events. server.invoke(new CacheSerializableRunnable("Create values") { public void run2() throws CacheException { @@ -2297,7 +2297,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } } }); - + Wait.pause(1 * 1000); // cqs should not get any creates, deletes or updates. rdubey. validateCQ(client, "testCQEvents_0", @@ -2309,25 +2309,25 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ 15, /* queryDeletes: */ 5, /* totalEvents: */ size + 15 + 5); - + // Close. closeClient(client); closeServer(server); } - + /** * Test query execution multiple times on server without ALIAS. * @throws Exception */ public void testCqEventsWithoutAlias() throws Exception { - + final Host host = Host.getHost(0); VM server = host.getVM(0); VM client = host.getVM(1); - + createServer(server); - + final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); final String host0 = NetworkUtils.getServerHostName(server.getHost()); @@ -2336,18 +2336,18 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { // Create client. //createClient(client, thePort, host0); - + // Create CQs. createCQ(client, poolName, "testCQEvents_0", cqs[11]); - - executeCQ(client, "testCQEvents_0", false, null); - + + executeCQ(client, "testCQEvents_0", false, null); + // Init values at server. int size = 10; createValues(server, regions[0], size); - + waitForCreated(client, "testCQEvents_0", KEY+size); - + // validate Create events. validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, @@ -2358,14 +2358,14 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size); - + // Update values. createValues(server, regions[0], 5); createValues(server, regions[0], 10); - + waitForUpdated(client, "testCQEvents_0", KEY+size); - - + + // validate Update events. validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, @@ -2376,11 +2376,11 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ 15, /* queryDeletes: */ 0, /* totalEvents: */ size + 15); - + // Validate delete events. deleteValues(server, regions[0], 5); waitForDestroyed(client, "testCQEvents_0", KEY+5); - + validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, /* creates: */ size, @@ -2390,7 +2390,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ 15, /* queryDeletes: */ 5, /* totalEvents: */ size + 15 + 5); - + // Insert invalid Events. server.invoke(new CacheSerializableRunnable("Create values") { public void run2() throws CacheException { @@ -2402,7 +2402,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } } }); - + Wait.pause(1 * 1000); // cqs should not get any creates, deletes or updates. rdubey. validateCQ(client, "testCQEvents_0", @@ -2414,7 +2414,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ 15, /* queryDeletes: */ 5, /* totalEvents: */ size + 15 + 5); - + // Close. closeClient(client); closeServer(server); @@ -2427,9 +2427,9 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { final Host host = Host.getHost(0); VM server = host.getVM(0); VM client = host.getVM(1); - + createServer(server); - + final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); final String host0 = NetworkUtils.getServerHostName(server.getHost()); @@ -2438,7 +2438,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { // Create client. //createClient(client, thePort, host0); - + // Create CQs. createCQ(client, poolName, "testEnableDisable_0", cqs[0]); executeCQ(client, "testEnableDisable_0", false, null); @@ -2453,10 +2453,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { cqService.stopCqs(); } catch (Exception cqe) { Assert.fail("Failed to getCQService.", cqe); - } - } + } + } }); - + Wait.pause(1 * 1000); // Init values at server. int size = 10; @@ -2483,11 +2483,11 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { cqService.executeCqs(); } catch (Exception cqe) { Assert.fail("Failed to getCQService.", cqe); - } + } } }); Wait.pause(1 * 1000); - createValues(server, regions[0], size); + createValues(server, regions[0], size); waitForUpdated(client, "testEnableDisable_0", KEY+size); // It gets created on the CQs validateCQ(client, "testEnableDisable_0", @@ -2510,13 +2510,13 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { cqService.stopCqs("/root/" + regions[0]); } catch (Exception cqe) { Assert.fail("Failed to getCQService.", cqe); - } + } } }); - + Wait.pause(2 * 1000); deleteValues(server, regions[0], size / 2); - Wait.pause(1 * 500); + Wait.pause(1 * 500); // There should not be any deletes. validateCQ(client, "testEnableDisable_0", /* resultSize: */ noTest, @@ -2538,11 +2538,11 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { cqService.executeCqs("/root/" + regions[0]); } catch (Exception cqe) { Assert.fail("Failed to getCQService.", cqe); - } + } } }); Wait.pause(1 * 1000); - createValues(server, regions[0], size / 2); + createValues(server, regions[0], size / 2); waitForCreated(client, "testEnableDisable_0", KEY+(size / 2)); // Gets updated on the CQ. validateCQ(client, "testEnableDisable_0", @@ -2554,12 +2554,12 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { /* queryUpdates: */ size, /* queryDeletes: */ 0, /* totalEvents: */ size * 3 / 2); - + // Close. closeClient(client); - closeServer(server); + closeServer(server); } - + /** * Test for Complex queries. * @throws Exception @@ -2568,42 +2568,42 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { final Host host = Host.getHost(0); VM server = host.getVM(0); VM client = host.getVM(1); - + createServer(server); - + final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); final String host0 = NetworkUtils.getServerHostName(server.getHost()); - + String poolName = "testQuery"; createPool(client, poolName, host0, thePort); - + // Create client. createClient(client, thePort, host0); - + // Create CQs. createCQ(client, poolName, "testQuery_3", cqs[3]); executeCQ(client, "testQuery_3", true, null); - + createCQ(client, poolName, "testQuery_4", cqs[4]); executeCQ(client, "testQuery_4", true, null); - + createCQ(client, poolName, "testQuery_5", cqs[5]); executeCQ(client, "testQuery_5", true, null); - + createCQ(client, poolName, "testQuery_6", cqs[6]); executeCQ(client, "testQuery_6", true, null); - + createCQ(client, poolName, "testQuery_7", cqs[7]); executeCQ(client, "testQuery_7", true, null); - + createCQ(client, poolName, "testQuery_8", cqs[8]); executeCQ(client, "testQuery_8", true, null); - + // Close. closeClient(client); closeServer(server); } - + /** * Test for CQ Fail over. * @throws Exception @@ -2613,17 +2613,17 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { VM server1 = host.getVM(0); VM server2 = host.getVM(1); VM client = host.getVM(2); - + createServer(server1); - + final int port1 = server1.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); final String host0 = NetworkUtils.getServerHostName(server1.getHost()); // Create client. // Properties props = new Properties(); // Create client with redundancyLevel -1 - + final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1); - + //createClient(client, new int[] {port1, ports[0]}, host0, "-1"); String poolName = "testCQFailOver"; @@ -2636,63 +2636,63 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { executeCQ(client, "testCQFailOver_" + i, false, null); } Wait.pause(1 * 1000); - + // CREATE. createValues(server1, regions[0], 10); createValues(server1, regions[1], 10); waitForCreated(client, "testCQFailOver_0", KEY+10); Wait.pause(1 * 1000); - + createServer(server2, ports[0]); final int thePort2 = server2.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); - System.out.println("### Port on which server1 running : " + port1 + + System.out.println("### Port on which server1 running : " + port1 + " Server2 running : " + thePort2); Wait.pause(3 * 1000); // Extra pause - added after downmerging trunk r17050 Wait.pause(5 * 1000); - + // UPDATE - 1. - createValues(server1, regions[0], 10); + createValues(server1, regions[0], 10); createValues(server1, regions[1], 10); - + waitForUpdated(client, "testCQFailOver_0", KEY+10); - + int[] resultsCnt = new int[] {10, 1, 2}; - + for (int i=0; i < numCQs; i++) { validateCQ(client, "testCQFailOver_" + i, noTest, resultsCnt[i], resultsCnt[i], noTest); - } - + } + // Close server1. closeServer(server1); - + // Fail over should happen. Wait.pause(3 * 1000); - + for (int i=0; i < numCQs; i++) { validateCQ(client, "testCQFailOver_" + i, noTest, resultsCnt[i], resultsCnt[i], noTest); - } - + } + // UPDATE - 2 this.clearCQListenerEvents(client, "testCQFailOver_0"); createValues(server2, regions[0], 10); createValues(server2, regions[1], 10); - + for (int i=1; i <= 10; i++) { waitForUpdated(client, "testCQFailOver_0", KEY+i); } - + for (int i=0; i < numCQs; i++) { validateCQ(client, "testCQFailOver_" + i, noTest, resultsCnt[i], resultsCnt[i] * 2, noTest); - } - + } + // Close. closeClient(client); closeServer(server2); } - + /** * Test for CQ Fail over/HA with redundancy level set. * @throws Exception @@ -2702,37 +2702,37 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { VM server1 = host.getVM(0); VM server2 = host.getVM(1); VM server3 = host.getVM(2); - + VM client = host.getVM(3); - + //Killing servers can cause this message on the client side. IgnoredException.addIgnoredException("Could not find any server"); - + createServer(server1); - + final int port1 = server1.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); final String host0 = NetworkUtils.getServerHostName(server1.getHost()); - + final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2); - + createServer(server2, ports[0]); final int thePort2 = server2.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); - + createServer(server3, ports[1]); final int port3 = server3.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); - System.out.println("### Port on which server1 running : " + port1 + - " server2 running : " + thePort2 + + System.out.println("### Port on which server1 running : " + port1 + + " server2 running : " + thePort2 + " Server3 running : " + port3); - - + + // Create client - With 3 server endpoints and redundancy level set to 2. - + String poolName = "testCQStopExecute"; createPool(client, poolName, new String[] {host0, host0, host0}, new int[] {port1, thePort2, port3}); - + // Create client with redundancyLevel 1 //createClient(client, new int[] {port1, thePort2, port3}, host0, "1"); - + // Create CQs. int numCQs = 1; for (int i=0; i < numCQs; i++) { @@ -2740,64 +2740,64 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { createCQ(client, poolName, "testCQHA_" + i, cqs[i]); executeCQ(client, "testCQHA_" + i, false, null); } - + Wait.pause(1 * 1000); - + // CREATE. createValues(server1, regions[0], 10); createValues(server1, regions[1], 10); - - + + waitForCreated(client, "testCQHA_0", KEY + 10); - - + + // Clients expected initial result. int[] resultsCnt = new int[] {10, 1, 2}; - + // Close server1. // To maintain the redundancy; it will make connection to endpoint-3. closeServer(server1); Wait.pause(3 * 1000); - + // UPDATE-1. createValues(server2, regions[0], 10); createValues(server2, regions[1], 10); - - + + waitForUpdated(client, "testCQHA_0", KEY + 10); - - + + // Validate CQ. for (int i=0; i < numCQs; i++) { validateCQ(client, "testCQHA_" + i, noTest, resultsCnt[i], resultsCnt[i], noTest); - } - + } + // Close server-2 closeServer(server2); Wait.pause(2 * 1000); - + // UPDATE - 2. clearCQListenerEvents(client, "testCQHA_0"); - - createValues(server3, regions[0], 10); + + createValues(server3, regions[0], 10); createValues(server3, regions[1], 10); - + // Wait for events at client. - + waitForUpdated(client, "testCQHA_0", KEY + 10); - - + + for (int i=0; i < numCQs; i++) { validateCQ(client, "testCQHA_" + i, noTest, resultsCnt[i], resultsCnt[i] * 2, noTest); - } - + } + // Close. closeClient(client); closeServer(server3); } /** - * Test Filter registration during GII. + * Test Filter registration during GII. * Bug fix 39014 * @throws Exception */ @@ -2808,22 +2808,20 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { VM client1 = host.getVM(2); VM client2 = host.getVM(3); - Wait.pause(3 * 1000); - createServer(server1); - + final int port1 = server1.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); final String host0 = NetworkUtils.getServerHostName(server1.getHost()); - + final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1); - + String poolName = "testFilterRegistrationDuringGII"; createPool(client1, poolName, new String[] {host0, host0}, new int[] {port1, ports[0]}, "-1"); createPool(client2, poolName, new String[] {host0, host0}, new int[] {port1, ports[0]}, "-1"); - + createClient(client1, new int[] {port1, ports[0]}, host0, "-1", poolName); createClient(client2, new int[] {port1, ports[0]}, host0, "-1", poolName); - + // Create CQs. final int numCQs = 2; for (int i=0; i < numCQs; i++) { @@ -2833,22 +2831,23 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { createCQ(client2, poolName, "client2_" + i, cqs[i]); executeCQ(client2, "client2_" + i, false, null); } - + final int interestSize = 20; registerInterestListCQ(client1, regions[0], interestSize, false); registerInterestListCQ(client2, regions[0], 0, true); - + Wait.pause(1 * 1000); - + // CREATE. createValues(server1, regions[0], 100); createValues(server1, regions[1], 10); - + waitForCreated(client1, "client1_0", KEY + 10); // Create server2. - server2.invoke(new CacheSerializableRunnable("Create Cache Server") { - public void run2() throws CacheException + server2.invoke(new CacheSerializableRunnable("Create Cache Server",new Object[]{100}) { + public void run2() throws CacheException{} + public void run3() throws CacheException { LogWriterUtils.getLogWriter().info("### Create Cache Server. ###"); AttributesFactory factory = new AttributesFactory(); @@ -2868,7 +2867,8 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } while(true) { - if (InitialImageOperation.slowImageSleeps > 0) { +// if (InitialImageOperation.slowImageSleeps > 0) { + if ((int) args[0] > 0) { // Create events while GII for HARegion is in progress. LocalRegion region1 = (LocalRegion)getRootRegion().getSubregion(regions[0]); for (int i = 90; i <= 120; i++) { @@ -2886,8 +2886,8 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { ); Wait.pause(3 * 1000); - - + + // Check if CQs are registered as part of GII. server2.invoke(new CacheSerializableRunnable("Create values") { public void run2() throws CacheException { @@ -2896,11 +2896,11 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { Iterator iter = proxies.iterator(); try { for (CacheClientProxy p : proxies){ - ClientProxyMembershipID clientId = p.getProxyID(); + ClientProxyMembershipID clientId = p.getProxyID(); List cqs = qs.getCqService().getAllClientCqs(clientId); getCache().getLogger().fine("Number of CQs found for client :" + clientId + " are :" + cqs.size()); if (cqs.size() != numCQs) { - fail("Number of CQs registerted by the client is :" + cqs.size() + + fail("Number of CQs registerted by the client is :" + cqs.size() + " less than expected : " + numCQs); } CqQuery cq = (CqQuery)cqs.get(0); @@ -2940,15 +2940,15 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { VM server1 = host.getVM(0); VM server2 = host.getVM(1); VM client = host.getVM(2); - + createServer(server1); createServer(server2); - + final int port1 = server1.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); final String host0 = NetworkUtils.getServerHostName(server1.getHost()); - + final int thePort2 = server2.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); - + SerializableRunnable createConnectionPool = new CacheSerializableRunnable("Create region") { public void run2() throws CacheException { @@ -2956,17 +2956,17 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { IgnoredException.addIgnoredException("java.net.ConnectException||java.net.SocketException"); AttributesFactory regionFactory = new AttributesFactory(); regionFactory.setScope(Scope.LOCAL); - + ClientServerTestCase.configureConnectionPool(regionFactory, host0, port1, thePort2, true, -1, -1, null); - + createRegion(regions[0], regionFactory.createRegionAttributes()); } }; - - + + // Create client. client.invoke(createConnectionPool); - + server1.invoke(new CacheSerializableRunnable("Create values") { public void run2() throws CacheException { Region region1 = getRootRegion().getSubregion(regions[0]); @@ -2975,25 +2975,25 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { } } }); - + // Put some values on the client. client.invoke(new CacheSerializableRunnable("Put values client") { public void run2() throws CacheException { Region region1 = getRootRegion().getSubregion(regions[0]); - + for (int i = 0; i < 10; i++) { region1.put("key-string-"+i, "client-value-"+i); } } }); - - + + Wait.pause(2 * 1000); closeServer(server1); closeServer(server2); } - - + + /** * Test getCQs for a regions */ @@ -3001,37 +3001,37 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { final Host host = Host.getHost(0); VM server = host.getVM(0); VM client = host.getVM(1); - + createServer(server); - + final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); final String host0 = NetworkUtils.getServerHostName(server.getHost()); - + // Create client. // createClient(client, thePort, host0); - + String poolName = "testGetCQsForARegionName"; createPool(client, poolName, host0, thePort); - + // Create CQs. createCQ(client, poolName, "testQuery_3", cqs[3]); executeCQ(client, "testQuery_3", true, null); - + createCQ(client, poolName, "testQuery_4", cqs[4]); executeCQ(client, "testQuery_4", true, null); - + createCQ(client, poolName, "testQuery_5", cqs[5]); executeCQ(client, "testQuery_5", true, null); - + createCQ(client, poolName, "testQuery_6", cqs[6]); executeCQ(client, "testQuery_6", true, null); //with regions[1] createCQ(client, poolName, "testQuery_7", cqs[7]); executeCQ(client, "testQuery_7", true, null); - + createCQ(client, poolName, "testQuery_8", cqs[8]); executeCQ(client, "testQuery_8", true, null); - + client.invoke(new CacheSerializableRunnable("Client disableCQs()") { public void run2() throws CacheException { // Get CQ Service. @@ -3042,31 +3042,31 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { assertNotNull("CQservice should not return null for cqs on this region : /root/"+regions[0], cq); getCache().getLogger().info("cqs for region: /root/"+regions[0]+" : "+cq.length); // closing on of the cqs. - + cq[0].close(); cq = cqService.getCqs("/root/"+regions[0]); assertNotNull("CQservice should not return null for cqs on this region : /root/"+regions[0], cq); getCache().getLogger().info("cqs for region: /root/"+regions[0]+" after closeing one of the cqs : "+cq.length); - + cq = cqService.getCqs("/root/"+regions[1]); getCache().getLogger().info("cqs for region: /root/"+regions[1]+" : "+cq.length); assertNotNull("CQservice should not return null for cqs on this region : /root/"+regions[1], cq); } catch (Exception cqe) { Assert.fail("Failed to getCQService", cqe); - } + } } }); - + // Close. closeClient(client); closeServer(server); - + } - + /** * Tests execution of queries with NULL in where clause like where ID = NULL * etc. - * + * * @throws Exception */ public void testQueryWithNULLInWhereClause() throws Exception @@ -3084,10 +3084,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase { String poolName = "testQueryWithNULLInWhereClause"
<TRUNCATED>