This is an automated email from the ASF dual-hosted git repository. jchen21 pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 4e0c8aa GEODE-9016: Fix the NPE for PutAll with CQ LOCAL_DESTROY message type (#6104) 4e0c8aa is described below commit 4e0c8aa6937ad2b5935a11994138381fa29a8644 Author: Jianxia Chen <11181423+jche...@users.noreply.github.com> AuthorDate: Thu Mar 11 11:19:39 2021 -0800 GEODE-9016: Fix the NPE for PutAll with CQ LOCAL_DESTROY message type (#6104) For PutAll and RemoveAll, when removing destroy token from CQ result keys, use the keys in the individual entry events, instead of using the key in the base event. --- .../internal/cache/DistributedCacheOperation.java | 25 ++-- .../internal/cache/DistributedPutAllOperation.java | 20 +++ .../cache/DistributedRemoveAllOperation.java | 20 +++ .../cache/DistributedCacheOperationTest.java | 24 ++++ .../cache/DistributedPutAllOperationTest.java | 51 ++++++++ .../cache/DistributedRemoveAllOperationTest.java | 51 ++++++++ .../dunit/PartitionedRegionCqQueryDUnitTest.java | 136 +++++++++++++++++++++ 7 files changed, 317 insertions(+), 10 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java index 3e55045..b2f2767 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java @@ -719,7 +719,7 @@ public abstract class DistributedCacheOperation { private void removeDestroyTokensFromCqResultKeys(FilterRoutingInfo filterRouting) { for (InternalDistributedMember m : filterRouting.getMembers()) { FilterInfo filterInfo = filterRouting.getFilterInfo(m); - if (filterInfo.getCQs() == null) { + if (filterInfo == null || filterInfo.getCQs() == null) { continue; } @@ -734,15 +734,20 @@ public abstract class DistributedCacheOperation { for (Object value : cf.filterProfile.getCqMap().values()) { ServerCQ cq = (ServerCQ) value; - for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) { - Long cqID = e.getKey(); - // For the CQs satisfying the event with destroy CQEvent, remove - // the entry form CQ cache. - if (cq != null && cq.getFilterID() != null && cq.getFilterID().equals(cqID) - && (e.getValue().equals(MessageType.LOCAL_DESTROY))) { - cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true); - } - } + doRemoveDestroyTokensFromCqResultKeys(filterInfo, cq); + } + } + } + + void doRemoveDestroyTokensFromCqResultKeys(FilterInfo filterInfo, ServerCQ cq) { + for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) { + Long cqID = e.getKey(); + // For the CQs satisfying the event with destroy CQEvent, remove + // the entry form CQ cache. + if (cq != null && cq.getFilterID() != null && cq.getFilterID().equals(cqID) + && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY) + && ((EntryOperation) event).getKey() != null) { + cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true); } } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java index 9a9737e..842d7cd 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; @@ -39,6 +40,7 @@ import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.Scope; import org.apache.geode.cache.persistence.PersistentReplicatesOfflineException; import org.apache.geode.cache.query.internal.cq.CqService; +import org.apache.geode.cache.query.internal.cq.ServerCQ; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.DirectReplyProcessor; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; @@ -46,6 +48,7 @@ import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo; import org.apache.geode.internal.cache.ha.ThreadIdentifier; import org.apache.geode.internal.cache.partitioned.PutAllPRMessage; +import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList; import org.apache.geode.internal.cache.tx.RemotePutAllMessage; @@ -814,6 +817,23 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation { return consolidated; } + @Override + void doRemoveDestroyTokensFromCqResultKeys(FilterInfo filterInfo, ServerCQ cq) { + for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) { + Long cqID = e.getKey(); + // For the CQs satisfying the event with destroy CQEvent, remove + // the entry from CQ cache. + for (int i = 0; i < this.putAllData.length; i++) { + @Unretained + EntryEventImpl entryEvent = getEventForPosition(i); + if (entryEvent != null && entryEvent.getKey() != null && cq != null + && cq.getFilterID() != null && cq.getFilterID().equals(cqID) + && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY)) { + cq.removeFromCqResultKeys(entryEvent.getKey(), true); + } + } + } + } @Override protected FilterInfo getLocalFilterRouting(FilterRoutingInfo frInfo) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java index f76c701..942a4dd 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.Set; import org.apache.logging.log4j.Logger; @@ -33,6 +34,7 @@ import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.Scope; import org.apache.geode.cache.persistence.PersistentReplicatesOfflineException; import org.apache.geode.cache.query.internal.cq.CqService; +import org.apache.geode.cache.query.internal.cq.ServerCQ; import org.apache.geode.distributed.internal.DirectReplyProcessor; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.InternalDataSerializer; @@ -40,6 +42,7 @@ import org.apache.geode.internal.cache.DistributedPutAllOperation.EntryVersionsL import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo; import org.apache.geode.internal.cache.ha.ThreadIdentifier; import org.apache.geode.internal.cache.partitioned.RemoveAllPRMessage; +import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList; import org.apache.geode.internal.cache.tx.RemoteRemoveAllMessage; @@ -584,6 +587,23 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation { return consolidated; } + @Override + void doRemoveDestroyTokensFromCqResultKeys(FilterInfo filterInfo, ServerCQ cq) { + for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) { + Long cqID = e.getKey(); + // For the CQs satisfying the event with destroy CQEvent, remove + // the entry from CQ cache. + for (int i = 0; i < this.removeAllData.length; i++) { + @Unretained + EntryEventImpl entryEvent = getEventForPosition(i); + if (entryEvent != null && entryEvent.getKey() != null && cq != null + && cq.getFilterID() != null && cq.getFilterID().equals(cqID) + && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY)) { + cq.removeFromCqResultKeys(entryEvent.getKey(), true); + } + } + } + } @Override protected FilterInfo getLocalFilterRouting(FilterRoutingInfo frInfo) { diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java index a97fecc..606fac6 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java @@ -17,6 +17,8 @@ package org.apache.geode.internal.cache; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -28,9 +30,11 @@ import java.util.Map; import org.junit.Test; import org.apache.geode.cache.CacheEvent; +import org.apache.geode.cache.query.internal.cq.ServerCQ; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.DistributedCacheOperation.CacheOperationMessage; import org.apache.geode.internal.cache.persistence.PersistentMemberID; +import org.apache.geode.internal.cache.tier.MessageType; public class DistributedCacheOperationTest { @@ -106,4 +110,24 @@ public class DistributedCacheOperationTest { throw new RuntimeException("boom"); } } + + @Test + public void testDoRemoveDestroyTokensFromCqResultKeys() { + Object key = new Object(); + HashMap hashMap = new HashMap(); + hashMap.put(1L, MessageType.LOCAL_DESTROY); + EntryEventImpl baseEvent = mock(EntryEventImpl.class); + ServerCQ serverCQ = mock(ServerCQ.class); + FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class); + DistributedCacheOperation distributedCacheOperation = + new DestroyOperation(baseEvent); + when(baseEvent.getKey()).thenReturn(key); + when(filterInfo.getCQs()).thenReturn(hashMap); + when(serverCQ.getFilterID()).thenReturn(new Long(1L)); + doNothing().when(serverCQ).removeFromCqResultKeys(isA(Object.class), isA(Boolean.class)); + + distributedCacheOperation.doRemoveDestroyTokensFromCqResultKeys(filterInfo, serverCQ); + + verify(serverCQ, times(1)).removeFromCqResultKeys(key, true); + } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java index a06920e..da0c1cf 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java @@ -15,11 +15,25 @@ package org.apache.geode.internal.cache; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.HashMap; + import org.junit.Test; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.cache.query.internal.cq.CqService; +import org.apache.geode.cache.query.internal.cq.ServerCQ; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.internal.cache.tier.MessageType; + public class DistributedPutAllOperationTest { @@ -33,4 +47,41 @@ public class DistributedPutAllOperationTest { assertThat(mockDistributedPutAllOperation.getBaseEvent()).isSameAs(mockEntryEventImpl); } + + @Test + public void testDoRemoveDestroyTokensFromCqResultKeys() { + EntryEventImpl baseEvent = mock(EntryEventImpl.class); + EntryEventImpl entryEvent = mock(EntryEventImpl.class); + BucketRegion bucketRegion = mock(BucketRegion.class); + InternalCache internalCache = mock(InternalCache.class); + RegionAttributes regionAttributes = mock(RegionAttributes.class); + FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class); + CqService cqService = mock(CqService.class); + InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class); + PartitionedRegion partitionedRegion = mock(PartitionedRegion.class); + ServerCQ serverCQ = mock(ServerCQ.class); + int putAllPRDataSize = 1; + DistributedPutAllOperation distributedPutAllOperation = + new DistributedPutAllOperation(baseEvent, putAllPRDataSize, false); + Object key = new Object(); + when(entryEvent.getKey()).thenReturn(key); + distributedPutAllOperation.addEntry(entryEvent); + HashMap hashMap = new HashMap(); + hashMap.put(1L, MessageType.LOCAL_DESTROY); + when(filterInfo.getCQs()).thenReturn(hashMap); + when(baseEvent.getRegion()).thenReturn(bucketRegion); + when(bucketRegion.getAttributes()).thenReturn(regionAttributes); + when(regionAttributes.getDataPolicy()).thenReturn(DataPolicy.DEFAULT); + when(bucketRegion.getCache()).thenReturn(internalCache); + when(bucketRegion.getPartitionedRegion()).thenReturn(partitionedRegion); + when(bucketRegion.getKeyInfo(any(), any(), any())).thenReturn(new KeyInfo(key, null, null)); + when(internalCache.getDistributedSystem()).thenReturn(internalDistributedSystem); + when(internalCache.getCqService()).thenReturn(cqService); + when(serverCQ.getFilterID()).thenReturn(new Long(1L)); + doNothing().when(serverCQ).removeFromCqResultKeys(isA(Object.class), isA(Boolean.class)); + + distributedPutAllOperation.doRemoveDestroyTokensFromCqResultKeys(filterInfo, serverCQ); + + verify(serverCQ, times(1)).removeFromCqResultKeys(key, true); + } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java index bd84116..7f2eb46 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java @@ -15,11 +15,25 @@ package org.apache.geode.internal.cache; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.HashMap; + import org.junit.Test; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.cache.query.internal.cq.CqService; +import org.apache.geode.cache.query.internal.cq.ServerCQ; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.internal.cache.tier.MessageType; + public class DistributedRemoveAllOperationTest { @@ -33,4 +47,41 @@ public class DistributedRemoveAllOperationTest { assertThat(mockDistributedRemoveAllOperation.getBaseEvent()).isSameAs(mockEntryEventImpl); } + + @Test + public void testDoRemoveDestroyTokensFromCqResultKeys() { + EntryEventImpl baseEvent = mock(EntryEventImpl.class); + EntryEventImpl entryEvent = mock(EntryEventImpl.class); + BucketRegion bucketRegion = mock(BucketRegion.class); + InternalCache internalCache = mock(InternalCache.class); + RegionAttributes regionAttributes = mock(RegionAttributes.class); + InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class); + FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class); + CqService cqService = mock(CqService.class); + PartitionedRegion partitionedRegion = mock(PartitionedRegion.class); + ServerCQ serverCQ = mock(ServerCQ.class); + int removeAllPRDataSize = 1; + DistributedRemoveAllOperation distributedRemoveAllOperation = + new DistributedRemoveAllOperation(baseEvent, removeAllPRDataSize, false); + Object key = new Object(); + when(entryEvent.getKey()).thenReturn(key); + distributedRemoveAllOperation.addEntry(entryEvent); + HashMap hashMap = new HashMap(); + hashMap.put(1L, MessageType.LOCAL_DESTROY); + when(filterInfo.getCQs()).thenReturn(hashMap); + when(baseEvent.getRegion()).thenReturn(bucketRegion); + when(bucketRegion.getAttributes()).thenReturn(regionAttributes); + when(bucketRegion.getPartitionedRegion()).thenReturn(partitionedRegion); + when(bucketRegion.getCache()).thenReturn(internalCache); + when(bucketRegion.getKeyInfo(any(), any(), any())).thenReturn(new KeyInfo(key, null, null)); + when(regionAttributes.getDataPolicy()).thenReturn(DataPolicy.DEFAULT); + when(internalCache.getDistributedSystem()).thenReturn(internalDistributedSystem); + when(internalCache.getCqService()).thenReturn(cqService); + when(serverCQ.getFilterID()).thenReturn(new Long(1L)); + doNothing().when(serverCQ).removeFromCqResultKeys(isA(Object.class), isA(Boolean.class)); + + distributedRemoveAllOperation.doRemoveDestroyTokensFromCqResultKeys(filterInfo, serverCQ); + + verify(serverCQ, times(1)).removeFromCqResultKeys(key, true); + } } diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java index 3f520f6..6d2e979 100644 --- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java +++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java @@ -22,9 +22,14 @@ import static org.apache.geode.test.dunit.Assert.assertNotNull; import static org.apache.geode.test.dunit.Assert.assertNull; import static org.apache.geode.test.dunit.Assert.assertTrue; import static org.apache.geode.test.dunit.Assert.fail; +import static org.apache.geode.test.dunit.VM.getVM; +import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; +import java.util.Set; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -121,6 +126,137 @@ public class PartitionedRegionCqQueryDUnitTest extends JUnit4CacheTestCase { private static int bridgeServerPort; @Test + public void testPutAllWithCQLocalDestroy() { + VM server1 = getVM(0); + VM server2 = getVM(1); + VM client = getVM(2); + + final String cqName = "testPutAllWithCQLocalDestroy_0"; + createServer(server1); + createServer(server2); + final String host = VM.getHostName(); + final int port = server2.invoke(() -> getCacheServerPort()); + createClient(client, port, host); + createCQ(client, cqName, cqs[0]); + + int numObjects = 1000; + + server1.invoke(() -> { + Region<String, Object> region = + getCache().getRegion(SEPARATOR + "root" + SEPARATOR + regions[0]); + Map<String, Object> buffer = new HashMap(); + for (int i = 1; i < numObjects; i++) { + Portfolio p = new Portfolio(i); + buffer.put("" + i, p); + } + region.putAll(buffer); + assertThat(region.size()).isEqualTo(numObjects - 1); + }); + + client.invoke(() -> { + QueryService cqService = getCache().getQueryService(); + CqQuery cqQuery = cqService.getCq(cqName); + assertThat(cqQuery) + .withFailMessage("Failed to get CQ " + cqName) + .isNotNull(); + cqQuery.executeWithInitialResults(); + }); + + server1.invoke(() -> { + Region<String, Object> region = + getCache().getRegion(SEPARATOR + "root" + SEPARATOR + regions[0]); + // PutAll with entries that do not satisfy CQ. This is to generate LOCAL_DESTROY CQ event + Map<String, Object> buffer = new HashMap(); + for (int i = 1; i < numObjects; i++) { + Portfolio p = new Portfolio(-1 * i); + buffer.put("" + i, p); + } + region.putAll(buffer); + assertThat(region.size()).isEqualTo(numObjects - 1); + }); + + client.invoke(() -> { + QueryService cqService = getCache().getQueryService(); + CqQuery cqQuery = cqService.getCq(cqName); + assertThat(cqQuery) + .withFailMessage("Failed to get CQ " + cqName) + .isNotNull(); + CqQueryTestListener cqListener = + (CqQueryTestListener) cqQuery.getCqAttributes().getCqListener(); + assertThat(cqListener.getTotalEventCount()).isEqualTo(numObjects - 1); + }); + + cqHelper.closeClient(client); + cqHelper.closeServer(server2); + cqHelper.closeServer(server1); + } + + @Test + public void testRemoveAllWithCQLocalDestroy() { + VM server1 = getVM(0); + VM server2 = getVM(1); + VM client = getVM(2); + + final String cqName = "testRemoveAllWithCQLocalDestroy_0"; + createServer(server1); + createServer(server2); + final String host = VM.getHostName(); + final int port = server2.invoke(() -> getCacheServerPort()); + createClient(client, port, host); + createCQ(client, cqName, cqs[0]); + + int numObjects = 1000; + + server1.invoke(() -> { + Region<String, Object> region = + getCache().getRegion(SEPARATOR + "root" + SEPARATOR + regions[0]); + Map<String, Object> buffer = new HashMap(); + for (int i = 1; i < numObjects; i++) { + Portfolio p = new Portfolio(i); + buffer.put("" + i, p); + } + region.putAll(buffer); + assertThat(region.size()).isEqualTo(numObjects - 1); + }); + + client.invoke(() -> { + QueryService cqService = getCache().getQueryService(); + CqQuery cqQuery = cqService.getCq(cqName); + assertThat(cqQuery) + .withFailMessage("Failed to get CQ " + cqName) + .isNotNull(); + cqQuery.executeWithInitialResults(); + }); + + server1.invoke(() -> { + Region<String, Object> region = + getCache().getRegion(SEPARATOR + "root" + SEPARATOR + regions[0]); + Set<String> keys = new HashSet<>(); + for (int i = 1; i < numObjects; i++) { + keys.add("" + i); + } + // This is to generate LOCAL_DESTROY CQ event + region.removeAll(keys); + assertThat(region.size()).isEqualTo(0); + }); + + client.invoke(() -> { + QueryService cqService = getCache().getQueryService(); + CqQuery cqQuery = cqService.getCq(cqName); + assertThat(cqQuery) + .withFailMessage("Failed to get CQ " + cqName) + .isNotNull(); + CqQueryTestListener cqListener = + (CqQueryTestListener) cqQuery.getCqAttributes().getCqListener(); + assertThat(cqListener.getTotalEventCount()).isEqualTo(numObjects - 1); + }); + + cqHelper.closeClient(client); + cqHelper.closeServer(server2); + cqHelper.closeServer(server1); + } + + @Test public void testCQLeakWithPartitionedRegion() throws Exception { // creating servers. final Host host = Host.getHost(0);