Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-420 c8c2bdd8a -> a97ea4ee5


GEODE-1830: Use event, not entry, to check for a tombstone

In LocalRegion.basicPutPart2, check for a TOMBSTONE using
EntryEvent.getNewValue. Previously we were checking the
RegionEntry.isTombstone, but in the case of a PROXY region, the
RegionEntry is marker that does not set isTombstone to true.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/0c8b2b3b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/0c8b2b3b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/0c8b2b3b

Branch: refs/heads/feature/GEODE-420
Commit: 0c8b2b3b8f9f3da6ccf25fd17ab9313a22ea9d0a
Parents: 5cb5009
Author: Dan Smith <[email protected]>
Authored: Mon Aug 29 17:40:45 2016 -0700
Committer: Dan Smith <[email protected]>
Committed: Tue Aug 30 16:13:25 2016 -0700

----------------------------------------------------------------------
 .../gemfire/internal/cache/LocalRegion.java     |    3 +-
 .../cache30/ClientServerCCEDUnitTest.java       | 1350 +++++++++---------
 2 files changed, 707 insertions(+), 646 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c8b2b3b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 461ad3d..41b9578 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -6091,7 +6091,8 @@ public class LocalRegion extends AbstractRegion
       boolean clearConflict)
   {
     final boolean isNewKey = event.getOperation().isCreate();
-    final boolean invokeCallbacks = !entry.isTombstone(); // put() is creating 
a tombstone
+    //Invoke callbacks only if we are not creating a tombstone
+    final boolean invokeCallbacks = event.basicGetNewValue() != 
Token.TOMBSTONE;
 
     if (isNewKey) {
       updateStatsForCreate();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c8b2b3b/geode-core/src/test/java/com/gemstone/gemfire/cache30/ClientServerCCEDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/com/gemstone/gemfire/cache30/ClientServerCCEDUnitTest.java
 
b/geode-core/src/test/java/com/gemstone/gemfire/cache30/ClientServerCCEDUnitTest.java
index 957dcc0..25bf705 100644
--- 
a/geode-core/src/test/java/com/gemstone/gemfire/cache30/ClientServerCCEDUnitTest.java
+++ 
b/geode-core/src/test/java/com/gemstone/gemfire/cache30/ClientServerCCEDUnitTest.java
@@ -1,645 +1,705 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache30;
-
-import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.InterestResultPolicy;
-import com.gemstone.gemfire.cache.PartitionAttributesFactory;
-import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache.client.ClientCache;
-import com.gemstone.gemfire.cache.client.ClientCacheFactory;
-import com.gemstone.gemfire.cache.client.ClientRegionFactory;
-import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
-import com.gemstone.gemfire.cache.server.CacheServer;
-import com.gemstone.gemfire.internal.AvailablePortHelper;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
-import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
-import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
-import com.gemstone.gemfire.test.dunit.Assert;
-import com.gemstone.gemfire.test.dunit.Host;
-import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.NetworkUtils;
-import com.gemstone.gemfire.test.dunit.SerializableCallable;
-import com.gemstone.gemfire.test.dunit.SerializableRunnable;
-import com.gemstone.gemfire.test.dunit.VM;
-import com.gemstone.gemfire.test.dunit.Wait;
-import com.gemstone.gemfire.test.dunit.WaitCriterion;
-import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-/**
- * concurrency-control tests for client/server
- * 
- *
- */
-@Category(DistributedTest.class)
-public class ClientServerCCEDUnitTest extends JUnit4CacheTestCase {
-  public static LocalRegion TestRegion;
-  
-  public void setup() {
-    // for bug #50683 we need a short queue-removal-message processing interval
-    HARegionQueue.setMessageSyncInterval(5);
-  }
-  
-  @Override
-  public final void preTearDownCacheTestCase() {
-    disconnectAllFromDS();
-    
HARegionQueue.setMessageSyncInterval(HARegionQueue.DEFAULT_MESSAGE_SYNC_INTERVAL);
-  }
-
-  public ClientServerCCEDUnitTest() {
-    super();
-  }
-
-  @Test
-  public void testClientServerRRTombstoneGC() {
-    clientServerTombstoneGCTest(getUniqueName(), true);
-  }
-  
-  @Test
-  public void testClientServerPRTombstoneGC() {
-    clientServerTombstoneGCTest(getUniqueName(), false);
-  }
-  
-  @Test
-  public void testPutAllInNonCCEClient() {
-    Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
-    final String name = this.getUniqueName() + "Region";
-    
-    int port = createServerRegion(vm0, name, true);
-    createClientRegion(vm1, name, port, false);
-    doPutAllInClient(vm1);
-  }
-  
-  
-  /**
-   * test that distributed GC messages are sent to clients and properly 
processed
-   * @param replicatedRegion whether to use a RR or PR in the servers
-   */
-  private void clientServerTombstoneGCTest(String uniqueName, boolean 
replicatedRegion) {
-    Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
-    VM vm2 = host.getVM(2);
-    VM vm3 = host.getVM(3);
-    final String name = uniqueName + "Region";
-
-
-    createServerRegion(vm0, name, replicatedRegion);
-    int port = createServerRegion(vm1, name, replicatedRegion);
-    createClientRegion(vm2, name, port, true);
-    createClientRegion(vm3, name, port, true);
-    createEntries(vm2);
-    destroyEntries(vm3);
-    unregisterInterest(vm3);
-    forceGC(vm0);
-    if (!replicatedRegion) {
-      //other bucket might be in vm1
-      forceGC(vm1);
-    }
-    checkClientReceivedGC(vm2);
-    checkClientDoesNotReceiveGC(vm3);
-  }
-  
-  /**
-   * for bug #40791 we pull tombstones into clients on get(), getAll() and
-   * registerInterest() to protect the client cache from stray putAll
-   * events sitting in backup queues on the server 
-   */
-  @Test
-  public void testClientRIGetsTombstonesRR() throws Exception {
-    clientRIGetsTombstoneTest(getUniqueName(),true);
-  }
-  
-  @Test
-  public void testClientRIGetsTombstonesPR() throws Exception {
-    clientRIGetsTombstoneTest(getUniqueName(),false);
-  }
-
-  /**
-   * test that clients receive tombstones in register-interest results
-   */
-  private void clientRIGetsTombstoneTest(String uniqueName, boolean 
replicatedRegion) {
-    Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
-    VM vm2 = host.getVM(2);
-    final String name = uniqueName + "Region";
-
-
-    createServerRegion(vm0, name, replicatedRegion);
-    int port = createServerRegion(vm1, name, replicatedRegion);
-    createEntries(vm0);
-    destroyEntries(vm0);
-    
-    LogWriterUtils.getLogWriter().info("***************** register interest on 
all keys");
-    createClientRegion(vm2, name, port, true);
-    registerInterest(vm2);
-    ensureAllTombstonesPresent(vm2);
-    
-    LogWriterUtils.getLogWriter().info("***************** clear cache and 
register interest on one key, Object0");
-    clearLocalCache(vm2);
-    registerInterestOneKey(vm2, "Object0");
-    List<String> keys = new ArrayList(1);
-    keys.add("Object0");
-    ensureAllTombstonesPresent(vm2, keys);
-
-    LogWriterUtils.getLogWriter().info("***************** clear cache and 
register interest on four keys");
-    clearLocalCache(vm2);
-    keys = new ArrayList(4);
-    for (int i=0; i<4; i++) {
-      keys.add("Object"+i);
-    }
-    registerInterest(vm2, keys);
-    ensureAllTombstonesPresent(vm2, keys);
-
-    LogWriterUtils.getLogWriter().info("***************** clear cache and 
register interest with regex on four keys");
-    clearLocalCache(vm2);
-    registerInterestRegex(vm2, "Object[0-3]");
-    ensureAllTombstonesPresent(vm2, keys);
-
-    LogWriterUtils.getLogWriter().info("***************** fetch entries with 
getAll()");
-    clearLocalCache(vm2);
-    getAll(vm2);
-    ensureAllTombstonesPresent(vm2);
-  }
-  
-  @Test
-  public void testClientRIGetsInvalidEntriesRR() throws Exception {
-    clientRIGetsInvalidEntriesTest(getUniqueName(),true);
-  }
-  
-  @Test
-  public void testClientRIGetsInvalidEntriesPR() throws Exception {
-    clientRIGetsInvalidEntriesTest(getUniqueName(),false);
-  }
-
-  private void clientRIGetsInvalidEntriesTest(String uniqueName, boolean 
replicatedRegion) {
-    Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
-    VM vm2 = host.getVM(2);
-    final String name = uniqueName + "Region";
-
-
-    createServerRegion(vm0, name, replicatedRegion);
-    int port = createServerRegion(vm1, name, replicatedRegion);
-    createEntries(vm0);
-    invalidateEntries(vm0);
-    
-    LogWriterUtils.getLogWriter().info("***************** register interest on 
all keys");
-    createClientRegion(vm2, name, port, true);
-    registerInterest(vm2);
-    ensureAllInvalidsPresent(vm2);
-    
-    LogWriterUtils.getLogWriter().info("***************** clear cache and 
register interest on one key, Object0");
-    clearLocalCache(vm2);
-    registerInterestOneKey(vm2, "Object0");
-    List<String> keys = new ArrayList(1);
-    keys.add("Object0");
-    ensureAllInvalidsPresent(vm2, keys);
-
-    LogWriterUtils.getLogWriter().info("***************** clear cache and 
register interest on four keys");
-    clearLocalCache(vm2);
-    keys = new ArrayList(4);
-    for (int i=0; i<4; i++) {
-      keys.add("Object"+i);
-    }
-    registerInterest(vm2, keys);
-    ensureAllInvalidsPresent(vm2, keys);
-
-    LogWriterUtils.getLogWriter().info("***************** clear cache and 
register interest with regex on four keys");
-    clearLocalCache(vm2);
-    registerInterestRegex(vm2, "Object[0-3]");
-    ensureAllInvalidsPresent(vm2, keys);
-
-    LogWriterUtils.getLogWriter().info("***************** fetch entries with 
getAll()");
-    clearLocalCache(vm2);
-    getAll(vm2);
-    ensureAllInvalidsPresent(vm2);
-  }
-
-  
-  private void registerInterest(VM vm) {
-    vm.invoke(new SerializableRunnable("register interest in all keys") {
-      public void run() {
-        TestRegion.registerInterestRegex(".*");
-      }
-    });
-  }
-  
-  private void unregisterInterest(VM vm) {
-    vm.invoke(new SerializableRunnable("unregister interest in all keys") {
-      public void run() {
-//        TestRegion.dumpBackingMap();
-        TestRegion.unregisterInterestRegex(".*");
-//        TestRegion.dumpBackingMap();
-      }
-    });
-  }
-  
-  private void registerInterest(VM vm, final List keys) {
-    vm.invoke(new SerializableRunnable("register interest in key list") {
-      public void run() {
-        TestRegion.registerInterest(keys);
-      }
-    });
-  }
-  
-  private void registerInterestOneKey(VM vm, final String key) {
-    vm.invoke(new SerializableRunnable("register interest in " + key) {
-      public void run() {
-        TestRegion.registerInterest(key);
-      }
-    });
-  }
-  
-  private void registerInterestRegex(VM vm, final String pattern) {
-    vm.invoke(new SerializableRunnable("register interest in key list") {
-      public void run() {
-        TestRegion.registerInterestRegex(pattern);
-      }
-    });
-  }
-  
-  private void ensureAllTombstonesPresent(VM vm) {
-    vm.invoke(new SerializableCallable("check all are tombstones") {
-      public Object call() {
-        for (int i=0; i<10; i++) {
-          assertTrue("expected a tombstone for Object"+i, 
TestRegion.containsTombstone("Object"+i));
-        }
-        return null;
-      }
-    });
-  }
-  
-  private void ensureAllTombstonesPresent(VM vm, final List keys) {
-    vm.invoke(new SerializableCallable("check tombstones in list") {
-      public Object call() {
-        for (Object key: keys) {
-          assertTrue("expected to find a tombstone for "+key, 
TestRegion.containsTombstone(key));
-        }
-        return null;
-      }
-    });
-  }
-  
-  private void ensureAllInvalidsPresent(VM vm) {
-    vm.invoke(new SerializableCallable("check all are tombstones") {
-      public Object call() {
-        for (int i=0; i<10; i++) {
-          assertTrue("expected to find an entry for Object"+i, 
TestRegion.containsKey("Object"+i));
-          assertTrue("expected to find entry invalid for Object"+i, 
!TestRegion.containsValue("Object"+i));
-        }
-        return null;
-      }
-    });
-  }
-  
-  private void ensureAllInvalidsPresent(VM vm, final List keys) {
-    vm.invoke(new SerializableCallable("check tombstones in list") {
-      public Object call() {
-        for (Object key: keys) {
-          assertTrue("expected to find an entry for "+key, 
TestRegion.containsKey(key));
-          assertTrue("expected to find entry invalid for "+key, 
!TestRegion.containsValue(key));
-        }
-        return null;
-      }
-    });
-  }
-
-  /* do a getAll of all keys */
-  private void getAll(VM vm) {
-    vm.invoke(new SerializableRunnable("getAll for all keys") {
-      public void run() {
-        Set<String> keys = new HashSet();
-        for (int i=0; i<10; i++) {
-          keys.add("Object"+i);
-        }
-        Map result = TestRegion.getAll(keys);
-        for (int i=0; i<10; i++) {
-          assertNull("expected no result for Object"+i, 
result.get("Object"+i));
-        }
-      }
-    });
-  }
-
-  /* this should remove all entries from the region, including tombstones */
-  private void clearLocalCache(VM vm) {
-    vm.invoke(new SerializableRunnable("clear local cache") {
-      public void run() {
-        TestRegion.localClear();
-      }
-    });
-  }
-
-  //  private void closeCache(VM vm) {
-
-  @Test
-  public void testClientServerRRQueueCleanup() {  // see bug #50879 if this 
fails
-    clientServerTombstoneMessageTest(true);
-  }
-  
-  @Test
-  public void testClientServerPRQueueCleanup() {  // see bug #50879 if this 
fails
-    clientServerTombstoneMessageTest(false);
-  }
-
-  /**
-   * test that distributed GC messages are properly cleaned out of durable
-   * client HA queues
-   */
-  private void clientServerTombstoneMessageTest(boolean replicatedRegion) {
-    Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
-    VM vm2 = host.getVM(2);
-    VM vm3 = host.getVM(3);
-    final String name = this.getUniqueName() + "Region";
-
-
-    int port1 = createServerRegion(vm0, name, replicatedRegion);
-    int port2 = createServerRegion(vm1, name, replicatedRegion);
-    createDurableClientRegion(vm2, name, port1, port2, true);
-    createDurableClientRegion(vm3, name, port1, port2, true);
-    createEntries(vm2);
-    destroyEntries(vm3);
-    forceGC(vm0);
-    if (!replicatedRegion) {
-      //other bucket might be in vm1
-      forceGC(vm1);
-    }
-    Wait.pause(5000); // better chance that WaitCriteria will succeed 1st time 
if we pause a bit
-    checkClientReceivedGC(vm2);
-    checkClientReceivedGC(vm3);
-    checkServerQueuesEmpty(vm0);
-    checkServerQueuesEmpty(vm1);
-  }
-  
-
-//  private void closeCache(VM vm) {
-//    vm.invoke(new SerializableCallable() {
-//      public Object call() throws Exception {
-//        closeCache();
-//        return null;
-//      }
-//    });
-//  }
-  
-  private void createEntries(VM vm) {
-    vm.invoke(new SerializableCallable("create entries") {
-      public Object call() {
-        for (int i=0; i<10; i++) {
-          TestRegion.create("Object"+i, Integer.valueOf(i));
-        }
-        return null;
-      }
-    });
-  }
-  
-
-  private void destroyEntries(VM vm) {
-    vm.invoke(new SerializableCallable("destroy entries") {
-      public Object call() {
-        for (int i=0; i<10; i++) {
-          TestRegion.destroy("Object"+i, Integer.valueOf(i));
-        }
-        assertEquals(0, TestRegion.size());
-        if (TestRegion.getDataPolicy().isReplicate()) {
-          assertEquals(10, TestRegion.getTombstoneCount());
-        }
-        return null;
-      }
-    });
-  }
-  
-  private void doPutAllInClient(VM vm) {
-    vm.invoke(new SerializableRunnable("do putAll") {
-      public void run() {
-        Map map = new HashMap();
-        for (int i=1000; i<1100; i++) {
-          map.put("object_"+i, i);
-        }
-        try {
-          TestRegion.putAll(map);
-          for (int i=1000; i<1100; i++) {
-            assertTrue("expected key object_"+i+" to be in the cache but it 
isn't", TestRegion.containsKey("object_"+i));
-          }
-        } catch (NullPointerException e) {
-          Assert.fail("caught NPE", e);
-        }
-      }
-    });
-  }
-  
-
-  private void invalidateEntries(VM vm) {
-    vm.invoke(new SerializableCallable("invalidate entries") {
-      public Object call() {
-        for (int i=0; i<10; i++) {
-          TestRegion.invalidate("Object"+i, Integer.valueOf(i));
-        }
-        assertEquals(10, TestRegion.size());
-        return null;
-      }
-    });
-  }
-  
-
-  private void forceGC(VM vm) {
-    vm.invoke(new SerializableCallable("force GC") {
-      public Object call() throws Exception {
-        
TestRegion.getCache().getTombstoneService().forceBatchExpirationForTests(10);
-        return null;
-      }
-    });
-  }
-  
-  private void checkClientReceivedGC(VM vm) {
-    vm.invoke(new SerializableCallable("check that GC happened") {
-      public Object call() throws Exception {
-        WaitCriterion wc = new WaitCriterion() {
-          
-          @Override
-          public boolean done() {
-            LogWriterUtils.getLogWriter().info("tombstone count = " + 
TestRegion.getTombstoneCount());
-            LogWriterUtils.getLogWriter().info("region size = " + 
TestRegion.size());
-            return TestRegion.getTombstoneCount() == 0 && TestRegion.size() == 
0;
-          }
-          
-          @Override
-          public String description() {
-            return "waiting for garbage collection to occur";
-          }
-        };
-        Wait.waitForCriterion(wc, 60000, 2000, true);
-        return null;
-      }
-    });
-  }
-        
-  private void checkServerQueuesEmpty(VM vm) {
-    vm.invoke(new SerializableCallable("check that client queues are properly 
cleared of old ClientTombstone messages") {
-
-      public Object call() throws Exception {
-        WaitCriterion wc = new WaitCriterion() {
-//          boolean firstTime = true;
-          
-          @Override
-          public boolean done() {
-            CacheClientNotifier singleton = CacheClientNotifier.getInstance();
-            Collection<CacheClientProxy> proxies = 
singleton.getClientProxies();
-//            boolean first = firstTime;
-//            firstTime = false;
-            for (CacheClientProxy proxy: proxies) {
-              if (!proxy.isPrimary()) {  // bug #50683 only applies to backup 
queues
-                int size = proxy.getQueueSize();
-                if (size > 0) {
-//                  if (first) {
-//                    ((LocalRegion)proxy.getHARegion()).dumpBackingMap();
-//                  }
-                  LogWriterUtils.getLogWriter().info("queue size ("+size+") is 
still > 0 for " + proxy.getProxyID()); 
-                  return false;
-                }
-              }
-            }
-            // also ensure that server regions have been cleaned up
-            int regionEntryCount = TestRegion.getRegionMap().size();
-            if (regionEntryCount > 0) {
-              LogWriterUtils.getLogWriter().info("TestRegion has unexpected 
entries - all should have been GC'd but we have " + regionEntryCount);
-              TestRegion.dumpBackingMap();
-              return false;
-            }
-            return true;
-          }
-          
-          @Override
-          public String description() {
-            return "waiting for queue removal messages to clear client queues";
-          }
-        };
-        Wait.waitForCriterion(wc, 60000, 2000, true);
-        return null;
-      }
-    });
-  }
-
-
-  private void checkClientDoesNotReceiveGC(VM vm) {
-    vm.invoke(new SerializableCallable("check that GC did not happen") {
-      public Object call() throws Exception {
-        if (TestRegion.getTombstoneCount() == 0) {
-          LogWriterUtils.getLogWriter().warning("region has no tombstones");
-//          TestRegion.dumpBackingMap();
-          throw new AssertionError("expected to find tombstones but region is 
empty");
-        }
-        return null;
-      }
-    });
-  }
-        
-        
-  private int createServerRegion(VM vm, final String regionName, final boolean 
replicatedRegion) {
-    SerializableCallable createRegion = new SerializableCallable() {
-      public Object call() throws Exception {
-//        TombstoneService.VERBOSE = true;
-        AttributesFactory af = new AttributesFactory();
-        if (replicatedRegion) {
-          af.setScope(Scope.DISTRIBUTED_ACK);
-          af.setDataPolicy(DataPolicy.REPLICATE);
-        } else {
-          af.setDataPolicy(DataPolicy.PARTITION);
-          af.setPartitionAttributes((new 
PartitionAttributesFactory()).setTotalNumBuckets(2).create());
-        }
-        TestRegion = (LocalRegion)createRootRegion(regionName, af.create());
-
-        CacheServer server = getCache().addCacheServer();
-        int port = AvailablePortHelper.getRandomAvailableTCPPort();
-        server.setPort(port);
-        server.start();
-        return port;
-      }
-    };
-
-    return (Integer) vm.invoke(createRegion);
-  }
-  
-  
-  
-  private void createClientRegion(final VM vm, final String regionName, final 
int port, final boolean ccEnabled) {
-    SerializableCallable createRegion = new SerializableCallable() {
-      public Object call() throws Exception {
-        ClientCacheFactory cf = new ClientCacheFactory();
-        cf.addPoolServer(NetworkUtils.getServerHostName(vm.getHost()), port);
-        cf.setPoolSubscriptionEnabled(true);
-        cf.set(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
-        ClientCache cache = getClientCache(cf);
-        ClientRegionFactory crf = 
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
-        crf.setConcurrencyChecksEnabled(ccEnabled);
-        TestRegion = (LocalRegion)crf.create(regionName);
-        TestRegion.registerInterestRegex(".*", 
InterestResultPolicy.KEYS_VALUES, false, true);
-        return null;
-      }
-    };
-    vm.invoke(createRegion);
-  }
-
-  // For durable client QRM testing we need a backup queue (redundancy=1) and
-  // durable attributes.  We also need to invoke readyForEvents()
-  private void createDurableClientRegion(final VM vm, final String regionName,
-      final int port1, final int port2, final boolean ccEnabled) {
-    SerializableCallable createRegion = new SerializableCallable() {
-      public Object call() throws Exception {
-        ClientCacheFactory cf = new ClientCacheFactory();
-        cf.addPoolServer(NetworkUtils.getServerHostName(vm.getHost()), port1);
-        cf.addPoolServer(NetworkUtils.getServerHostName(vm.getHost()), port2);
-        cf.setPoolSubscriptionEnabled(true);
-        cf.setPoolSubscriptionRedundancy(1);
-        // bug #50683 - secondary durable queue retains all GC messages
-        cf.set(DURABLE_CLIENT_ID, "" + vm.getPid());
-        cf.set(DURABLE_CLIENT_TIMEOUT, "" + 200);
-        cf.set(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
-        ClientCache cache = getClientCache(cf);
-        ClientRegionFactory crf = 
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
-        crf.setConcurrencyChecksEnabled(ccEnabled);
-        TestRegion = (LocalRegion)crf.create(regionName);
-        TestRegion.registerInterestRegex(".*", 
InterestResultPolicy.KEYS_VALUES, true, true);
-        cache.readyForEvents();
-        return null;
-      }
-    };
-    vm.invoke(createRegion);
-  }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache30;
+
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.CacheListener;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.InterestResultPolicy;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.client.ClientCache;
+import com.gemstone.gemfire.cache.client.ClientCacheFactory;
+import com.gemstone.gemfire.cache.client.ClientRegionFactory;
+import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
+import com.gemstone.gemfire.test.dunit.Assert;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.NetworkUtils;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+/**
+ * concurrency-control tests for client/server
+ * 
+ *
+ */
+@Category(DistributedTest.class)
+public class ClientServerCCEDUnitTest extends JUnit4CacheTestCase {
+  public static LocalRegion TestRegion;
+  
+  public void setup() {
+    // for bug #50683 we need a short queue-removal-message processing interval
+    HARegionQueue.setMessageSyncInterval(5);
+  }
+  
+  @Override
+  public final void preTearDownCacheTestCase() {
+    disconnectAllFromDS();
+    
HARegionQueue.setMessageSyncInterval(HARegionQueue.DEFAULT_MESSAGE_SYNC_INTERVAL);
+  }
+
+  public ClientServerCCEDUnitTest() {
+    super();
+  }
+
+  @Test
+  public void testClientServerRRTombstoneGC() {
+    clientServerTombstoneGCTest(getUniqueName(), true);
+  }
+  
+  @Test
+  public void testClientServerPRTombstoneGC() {
+    clientServerTombstoneGCTest(getUniqueName(), false);
+  }
+  
+  @Test
+  public void testPutAllInNonCCEClient() {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    final String name = this.getUniqueName() + "Region";
+    
+    int port = createServerRegion(vm0, name, true);
+    createClientRegion(vm1, name, port, false, 
ClientRegionShortcut.CACHING_PROXY);
+    doPutAllInClient(vm1);
+  }
+  
+  
+  /**
+   * test that distributed GC messages are sent to clients and properly 
processed
+   * @param replicatedRegion whether to use a RR or PR in the servers
+   */
+  private void clientServerTombstoneGCTest(String uniqueName, boolean 
replicatedRegion) {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+    final String name = uniqueName + "Region";
+
+
+    createServerRegion(vm0, name, replicatedRegion);
+    int port = createServerRegion(vm1, name, replicatedRegion);
+    createClientRegion(vm2, name, port, true, 
ClientRegionShortcut.CACHING_PROXY);
+    createClientRegion(vm3, name, port, true, 
ClientRegionShortcut.CACHING_PROXY);
+    createEntries(vm2);
+    destroyEntries(vm3);
+    unregisterInterest(vm3);
+    forceGC(vm0);
+    if (!replicatedRegion) {
+      //other bucket might be in vm1
+      forceGC(vm1);
+    }
+    checkClientReceivedGC(vm2);
+    checkClientDoesNotReceiveGC(vm3);
+  }
+  
+  /**
+   * for bug #40791 we pull tombstones into clients on get(), getAll() and
+   * registerInterest() to protect the client cache from stray putAll
+   * events sitting in backup queues on the server 
+   */
+  @Test
+  public void testClientRIGetsTombstonesRR() throws Exception {
+    clientRIGetsTombstoneTest(getUniqueName(),true);
+  }
+  
+  @Test
+  public void testClientRIGetsTombstonesPR() throws Exception {
+    clientRIGetsTombstoneTest(getUniqueName(),false);
+  }
+
+  /**
+   * test that clients receive tombstones in register-interest results
+   */
+  private void clientRIGetsTombstoneTest(String uniqueName, boolean 
replicatedRegion) {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    final String name = uniqueName + "Region";
+
+
+    createServerRegion(vm0, name, replicatedRegion);
+    int port = createServerRegion(vm1, name, replicatedRegion);
+    createEntries(vm0);
+    destroyEntries(vm0);
+    
+    LogWriterUtils.getLogWriter().info("***************** register interest on 
all keys");
+    createClientRegion(vm2, name, port, true, 
ClientRegionShortcut.CACHING_PROXY);
+    registerInterest(vm2);
+    ensureAllTombstonesPresent(vm2);
+    
+    LogWriterUtils.getLogWriter().info("***************** clear cache and 
register interest on one key, Object0");
+    clearLocalCache(vm2);
+    registerInterestOneKey(vm2, "Object0");
+    List<String> keys = new ArrayList(1);
+    keys.add("Object0");
+    ensureAllTombstonesPresent(vm2, keys);
+
+    LogWriterUtils.getLogWriter().info("***************** clear cache and 
register interest on four keys");
+    clearLocalCache(vm2);
+    keys = new ArrayList(4);
+    for (int i=0; i<4; i++) {
+      keys.add("Object"+i);
+    }
+    registerInterest(vm2, keys);
+    ensureAllTombstonesPresent(vm2, keys);
+
+    LogWriterUtils.getLogWriter().info("***************** clear cache and 
register interest with regex on four keys");
+    clearLocalCache(vm2);
+    registerInterestRegex(vm2, "Object[0-3]");
+    ensureAllTombstonesPresent(vm2, keys);
+
+    LogWriterUtils.getLogWriter().info("***************** fetch entries with 
getAll()");
+    clearLocalCache(vm2);
+    getAll(vm2);
+    ensureAllTombstonesPresent(vm2);
+  }
+  
+  @Test
+  public void testClientRIGetsInvalidEntriesRR() throws Exception {
+    clientRIGetsInvalidEntriesTest(getUniqueName(),true);
+  }
+  
+  @Test
+  public void testClientRIGetsInvalidEntriesPR() throws Exception {
+    clientRIGetsInvalidEntriesTest(getUniqueName(),false);
+  }
+
+  private void clientRIGetsInvalidEntriesTest(String uniqueName, boolean 
replicatedRegion) {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    final String name = uniqueName + "Region";
+
+
+    createServerRegion(vm0, name, replicatedRegion);
+    int port = createServerRegion(vm1, name, replicatedRegion);
+    createEntries(vm0);
+    invalidateEntries(vm0);
+    
+    LogWriterUtils.getLogWriter().info("***************** register interest on 
all keys");
+    createClientRegion(vm2, name, port, true, 
ClientRegionShortcut.CACHING_PROXY);
+    registerInterest(vm2);
+    ensureAllInvalidsPresent(vm2);
+    
+    LogWriterUtils.getLogWriter().info("***************** clear cache and 
register interest on one key, Object0");
+    clearLocalCache(vm2);
+    registerInterestOneKey(vm2, "Object0");
+    List<String> keys = new ArrayList(1);
+    keys.add("Object0");
+    ensureAllInvalidsPresent(vm2, keys);
+
+    LogWriterUtils.getLogWriter().info("***************** clear cache and 
register interest on four keys");
+    clearLocalCache(vm2);
+    keys = new ArrayList(4);
+    for (int i=0; i<4; i++) {
+      keys.add("Object"+i);
+    }
+    registerInterest(vm2, keys);
+    ensureAllInvalidsPresent(vm2, keys);
+
+    LogWriterUtils.getLogWriter().info("***************** clear cache and 
register interest with regex on four keys");
+    clearLocalCache(vm2);
+    registerInterestRegex(vm2, "Object[0-3]");
+    ensureAllInvalidsPresent(vm2, keys);
+
+    LogWriterUtils.getLogWriter().info("***************** fetch entries with 
getAll()");
+    clearLocalCache(vm2);
+    getAll(vm2);
+    ensureAllInvalidsPresent(vm2);
+  }
+
+  @Test
+  public void testClientCacheListenerDoesNotSeeTombstones() throws Exception {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    final String name = getUniqueName() + "Region";
+
+
+    createServerRegion(vm0, name, true);
+    int port = createServerRegion(vm1, name, true);
+    createEntries(vm0);
+    destroyEntries(vm0);
+
+
+    LogWriterUtils.getLogWriter().info("***************** register interest on 
all keys");
+    createClientRegion(vm2, name, port, true, ClientRegionShortcut.PROXY);
+    vm2.invoke(() ->
+      TestRegion.getAttributesMutator().addCacheListener(new 
RecordingCacheListener())
+    );
+
+    getAll(vm2);
+
+    vm2.invoke(() -> {
+      RecordingCacheListener listener = (RecordingCacheListener) 
TestRegion.getCacheListener();
+      assertEquals(Collections.emptyList(), listener.events);
+    });
+  }
+
+  
+  private void registerInterest(VM vm) {
+    vm.invoke(new SerializableRunnable("register interest in all keys") {
+      public void run() {
+        TestRegion.registerInterestRegex(".*");
+      }
+    });
+  }
+  
+  private void unregisterInterest(VM vm) {
+    vm.invoke(new SerializableRunnable("unregister interest in all keys") {
+      public void run() {
+//        TestRegion.dumpBackingMap();
+        TestRegion.unregisterInterestRegex(".*");
+//        TestRegion.dumpBackingMap();
+      }
+    });
+  }
+  
+  private void registerInterest(VM vm, final List keys) {
+    vm.invoke(new SerializableRunnable("register interest in key list") {
+      public void run() {
+        TestRegion.registerInterest(keys);
+      }
+    });
+  }
+  
+  private void registerInterestOneKey(VM vm, final String key) {
+    vm.invoke(new SerializableRunnable("register interest in " + key) {
+      public void run() {
+        TestRegion.registerInterest(key);
+      }
+    });
+  }
+  
+  private void registerInterestRegex(VM vm, final String pattern) {
+    vm.invoke(new SerializableRunnable("register interest in key list") {
+      public void run() {
+        TestRegion.registerInterestRegex(pattern);
+      }
+    });
+  }
+  
+  private void ensureAllTombstonesPresent(VM vm) {
+    vm.invoke(new SerializableCallable("check all are tombstones") {
+      public Object call() {
+        for (int i=0; i<10; i++) {
+          assertTrue("expected a tombstone for Object"+i, 
TestRegion.containsTombstone("Object"+i));
+        }
+        return null;
+      }
+    });
+  }
+  
+  private void ensureAllTombstonesPresent(VM vm, final List keys) {
+    vm.invoke(new SerializableCallable("check tombstones in list") {
+      public Object call() {
+        for (Object key: keys) {
+          assertTrue("expected to find a tombstone for "+key, 
TestRegion.containsTombstone(key));
+        }
+        return null;
+      }
+    });
+  }
+  
+  private void ensureAllInvalidsPresent(VM vm) {
+    vm.invoke(new SerializableCallable("check all are tombstones") {
+      public Object call() {
+        for (int i=0; i<10; i++) {
+          assertTrue("expected to find an entry for Object"+i, 
TestRegion.containsKey("Object"+i));
+          assertTrue("expected to find entry invalid for Object"+i, 
!TestRegion.containsValue("Object"+i));
+        }
+        return null;
+      }
+    });
+  }
+  
+  private void ensureAllInvalidsPresent(VM vm, final List keys) {
+    vm.invoke(new SerializableCallable("check tombstones in list") {
+      public Object call() {
+        for (Object key: keys) {
+          assertTrue("expected to find an entry for "+key, 
TestRegion.containsKey(key));
+          assertTrue("expected to find entry invalid for "+key, 
!TestRegion.containsValue(key));
+        }
+        return null;
+      }
+    });
+  }
+
+  /* do a getAll of all keys */
+  private void getAll(VM vm) {
+    vm.invoke(new SerializableRunnable("getAll for all keys") {
+      public void run() {
+        Set<String> keys = new HashSet();
+        for (int i=0; i<10; i++) {
+          keys.add("Object"+i);
+        }
+        Map result = TestRegion.getAll(keys);
+        for (int i=0; i<10; i++) {
+          assertNull("expected no result for Object"+i, 
result.get("Object"+i));
+        }
+      }
+    });
+  }
+
+  /* this should remove all entries from the region, including tombstones */
+  private void clearLocalCache(VM vm) {
+    vm.invoke(new SerializableRunnable("clear local cache") {
+      public void run() {
+        TestRegion.localClear();
+      }
+    });
+  }
+
+  //  private void closeCache(VM vm) {
+
+  @Test
+  public void testClientServerRRQueueCleanup() {  // see bug #50879 if this 
fails
+    clientServerTombstoneMessageTest(true);
+  }
+  
+  @Test
+  public void testClientServerPRQueueCleanup() {  // see bug #50879 if this 
fails
+    clientServerTombstoneMessageTest(false);
+  }
+
+  /**
+   * test that distributed GC messages are properly cleaned out of durable
+   * client HA queues
+   */
+  private void clientServerTombstoneMessageTest(boolean replicatedRegion) {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+    final String name = this.getUniqueName() + "Region";
+
+
+    int port1 = createServerRegion(vm0, name, replicatedRegion);
+    int port2 = createServerRegion(vm1, name, replicatedRegion);
+    createDurableClientRegion(vm2, name, port1, port2, true);
+    createDurableClientRegion(vm3, name, port1, port2, true);
+    createEntries(vm2);
+    destroyEntries(vm3);
+    forceGC(vm0);
+    if (!replicatedRegion) {
+      //other bucket might be in vm1
+      forceGC(vm1);
+    }
+    Wait.pause(5000); // better chance that WaitCriteria will succeed 1st time 
if we pause a bit
+    checkClientReceivedGC(vm2);
+    checkClientReceivedGC(vm3);
+    checkServerQueuesEmpty(vm0);
+    checkServerQueuesEmpty(vm1);
+  }
+  
+
+//  private void closeCache(VM vm) {
+//    vm.invoke(new SerializableCallable() {
+//      public Object call() throws Exception {
+//        closeCache();
+//        return null;
+//      }
+//    });
+//  }
+  
+  private void createEntries(VM vm) {
+    vm.invoke(new SerializableCallable("create entries") {
+      public Object call() {
+        for (int i=0; i<10; i++) {
+          TestRegion.create("Object"+i, Integer.valueOf(i));
+        }
+        return null;
+      }
+    });
+  }
+  
+
+  private void destroyEntries(VM vm) {
+    vm.invoke(new SerializableCallable("destroy entries") {
+      public Object call() {
+        for (int i=0; i<10; i++) {
+          TestRegion.destroy("Object"+i, Integer.valueOf(i));
+        }
+        assertEquals(0, TestRegion.size());
+        if (TestRegion.getDataPolicy().isReplicate()) {
+          assertEquals(10, TestRegion.getTombstoneCount());
+        }
+        return null;
+      }
+    });
+  }
+  
+  private void doPutAllInClient(VM vm) {
+    vm.invoke(new SerializableRunnable("do putAll") {
+      public void run() {
+        Map map = new HashMap();
+        for (int i=1000; i<1100; i++) {
+          map.put("object_"+i, i);
+        }
+        try {
+          TestRegion.putAll(map);
+          for (int i=1000; i<1100; i++) {
+            assertTrue("expected key object_"+i+" to be in the cache but it 
isn't", TestRegion.containsKey("object_"+i));
+          }
+        } catch (NullPointerException e) {
+          Assert.fail("caught NPE", e);
+        }
+      }
+    });
+  }
+  
+
+  private void invalidateEntries(VM vm) {
+    vm.invoke(new SerializableCallable("invalidate entries") {
+      public Object call() {
+        for (int i=0; i<10; i++) {
+          TestRegion.invalidate("Object"+i, Integer.valueOf(i));
+        }
+        assertEquals(10, TestRegion.size());
+        return null;
+      }
+    });
+  }
+  
+
+  private void forceGC(VM vm) {
+    vm.invoke(new SerializableCallable("force GC") {
+      public Object call() throws Exception {
+        
TestRegion.getCache().getTombstoneService().forceBatchExpirationForTests(10);
+        return null;
+      }
+    });
+  }
+  
+  private void checkClientReceivedGC(VM vm) {
+    vm.invoke(new SerializableCallable("check that GC happened") {
+      public Object call() throws Exception {
+        WaitCriterion wc = new WaitCriterion() {
+          
+          @Override
+          public boolean done() {
+            LogWriterUtils.getLogWriter().info("tombstone count = " + 
TestRegion.getTombstoneCount());
+            LogWriterUtils.getLogWriter().info("region size = " + 
TestRegion.size());
+            return TestRegion.getTombstoneCount() == 0 && TestRegion.size() == 
0;
+          }
+          
+          @Override
+          public String description() {
+            return "waiting for garbage collection to occur";
+          }
+        };
+        Wait.waitForCriterion(wc, 60000, 2000, true);
+        return null;
+      }
+    });
+  }
+        
+  private void checkServerQueuesEmpty(VM vm) {
+    vm.invoke(new SerializableCallable("check that client queues are properly 
cleared of old ClientTombstone messages") {
+
+      public Object call() throws Exception {
+        WaitCriterion wc = new WaitCriterion() {
+//          boolean firstTime = true;
+          
+          @Override
+          public boolean done() {
+            CacheClientNotifier singleton = CacheClientNotifier.getInstance();
+            Collection<CacheClientProxy> proxies = 
singleton.getClientProxies();
+//            boolean first = firstTime;
+//            firstTime = false;
+            for (CacheClientProxy proxy: proxies) {
+              if (!proxy.isPrimary()) {  // bug #50683 only applies to backup 
queues
+                int size = proxy.getQueueSize();
+                if (size > 0) {
+//                  if (first) {
+//                    ((LocalRegion)proxy.getHARegion()).dumpBackingMap();
+//                  }
+                  LogWriterUtils.getLogWriter().info("queue size ("+size+") is 
still > 0 for " + proxy.getProxyID()); 
+                  return false;
+                }
+              }
+            }
+            // also ensure that server regions have been cleaned up
+            int regionEntryCount = TestRegion.getRegionMap().size();
+            if (regionEntryCount > 0) {
+              LogWriterUtils.getLogWriter().info("TestRegion has unexpected 
entries - all should have been GC'd but we have " + regionEntryCount);
+              TestRegion.dumpBackingMap();
+              return false;
+            }
+            return true;
+          }
+          
+          @Override
+          public String description() {
+            return "waiting for queue removal messages to clear client queues";
+          }
+        };
+        Wait.waitForCriterion(wc, 60000, 2000, true);
+        return null;
+      }
+    });
+  }
+
+
+  private void checkClientDoesNotReceiveGC(VM vm) {
+    vm.invoke(new SerializableCallable("check that GC did not happen") {
+      public Object call() throws Exception {
+        if (TestRegion.getTombstoneCount() == 0) {
+          LogWriterUtils.getLogWriter().warning("region has no tombstones");
+//          TestRegion.dumpBackingMap();
+          throw new AssertionError("expected to find tombstones but region is 
empty");
+        }
+        return null;
+      }
+    });
+  }
+        
+        
+  private int createServerRegion(VM vm, final String regionName, final boolean 
replicatedRegion) {
+    SerializableCallable createRegion = new SerializableCallable() {
+      public Object call() throws Exception {
+//        TombstoneService.VERBOSE = true;
+        AttributesFactory af = new AttributesFactory();
+        if (replicatedRegion) {
+          af.setScope(Scope.DISTRIBUTED_ACK);
+          af.setDataPolicy(DataPolicy.REPLICATE);
+        } else {
+          af.setDataPolicy(DataPolicy.PARTITION);
+          af.setPartitionAttributes((new 
PartitionAttributesFactory()).setTotalNumBuckets(2).create());
+        }
+        TestRegion = (LocalRegion)createRootRegion(regionName, af.create());
+
+        CacheServer server = getCache().addCacheServer();
+        int port = AvailablePortHelper.getRandomAvailableTCPPort();
+        server.setPort(port);
+        server.start();
+        return port;
+      }
+    };
+
+    return (Integer) vm.invoke(createRegion);
+  }
+
+
+  private void createClientRegion(final VM vm,
+                                  final String regionName,
+                                  final int port,
+                                  final boolean ccEnabled, final 
ClientRegionShortcut clientRegionShortcut) {
+    SerializableCallable createRegion = new SerializableCallable() {
+      public Object call() throws Exception {
+        ClientCacheFactory cf = new ClientCacheFactory();
+        cf.addPoolServer(NetworkUtils.getServerHostName(vm.getHost()), port);
+        cf.setPoolSubscriptionEnabled(true);
+        cf.set(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
+        ClientCache cache = getClientCache(cf);
+        ClientRegionFactory crf = 
cache.createClientRegionFactory(clientRegionShortcut);
+        crf.setConcurrencyChecksEnabled(ccEnabled);
+        TestRegion = (LocalRegion)crf.create(regionName);
+        TestRegion.registerInterestRegex(".*", 
InterestResultPolicy.KEYS_VALUES, false, true);
+        return null;
+      }
+    };
+    vm.invoke(createRegion);
+  }
+
+  // For durable client QRM testing we need a backup queue (redundancy=1) and
+  // durable attributes.  We also need to invoke readyForEvents()
+  private void createDurableClientRegion(final VM vm, final String regionName,
+      final int port1, final int port2, final boolean ccEnabled) {
+    SerializableCallable createRegion = new SerializableCallable() {
+      public Object call() throws Exception {
+        ClientCacheFactory cf = new ClientCacheFactory();
+        cf.addPoolServer(NetworkUtils.getServerHostName(vm.getHost()), port1);
+        cf.addPoolServer(NetworkUtils.getServerHostName(vm.getHost()), port2);
+        cf.setPoolSubscriptionEnabled(true);
+        cf.setPoolSubscriptionRedundancy(1);
+        // bug #50683 - secondary durable queue retains all GC messages
+        cf.set(DURABLE_CLIENT_ID, "" + vm.getPid());
+        cf.set(DURABLE_CLIENT_TIMEOUT, "" + 200);
+        cf.set(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
+        ClientCache cache = getClientCache(cf);
+        ClientRegionFactory crf = 
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
+        crf.setConcurrencyChecksEnabled(ccEnabled);
+        TestRegion = (LocalRegion)crf.create(regionName);
+        TestRegion.registerInterestRegex(".*", 
InterestResultPolicy.KEYS_VALUES, true, true);
+        cache.readyForEvents();
+        return null;
+      }
+    };
+    vm.invoke(createRegion);
+  }
+
+  private static class RecordingCacheListener extends CacheListenerAdapter {
+    List<EntryEvent> events = new ArrayList<EntryEvent>();
+
+    @Override
+    public void afterCreate(final EntryEvent event) {
+      events.add(event);
+    }
+
+    @Override
+    public void afterDestroy(final EntryEvent event) {
+      events.add(event);
+    }
+
+    @Override
+    public void afterInvalidate(final EntryEvent event) {
+      events.add(event);
+    }
+
+    @Override
+    public void afterUpdate(final EntryEvent event) {
+      events.add(event);
+    }
+  }
+
+}

Reply via email to