http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/UpdateVersionDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/UpdateVersionDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/UpdateVersionDUnitTest.java
new file mode 100644
index 0000000..a1aec80
--- /dev/null
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/UpdateVersionDUnitTest.java
@@ -0,0 +1,962 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.Region.Entry;
+import com.gemstone.gemfire.cache.Scope;
+import 
com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallbackAdapter;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.LocalRegion.NonTXEntry;
+import 
com.gemstone.gemfire.internal.cache.partitioned.PRLocallyDestroyedException;
+import com.gemstone.gemfire.internal.cache.versions.VersionSource;
+import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.InternalGatewaySenderFactory;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.Invoke;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+/**
+ * @since GemFire 7.0.1
+ */
+@Category(DistributedTest.class)
+public class UpdateVersionDUnitTest extends JUnit4DistributedTestCase {
+
+  protected static final String regionName = "testRegion";
+  protected static Cache cache;
+  private static Set<IgnoredException>expectedExceptions = new 
HashSet<IgnoredException>();
+
+  @Override
+  public final void preTearDown() throws Exception {
+    closeCache();
+    Invoke.invokeInEveryVM(new SerializableRunnable() { public void run() {
+      closeCache();
+     } });
+  }
+  
+  @Test
+  public void testUpdateVersionAfterCreateWithSerialSender() {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0); // server1 site1
+    VM vm1 = host.getVM(1); // server2 site1
+
+    VM vm2 = host.getVM(2); // server1 site2
+    VM vm3 = host.getVM(3); // server2 site2
+
+    final String key = "key-1";
+
+    // Site 1
+    Integer lnPort = (Integer)vm0.invoke(() -> 
UpdateVersionDUnitTest.createFirstLocatorWithDSId( 1 ));
+
+    vm0.invoke(() -> UpdateVersionDUnitTest.createCache( lnPort));
+    vm0.invoke(() -> UpdateVersionDUnitTest.createSender( "ln1", 2, false, 10, 
1, false, false, null, true ));
+    
+    vm0.invoke(() -> 
UpdateVersionDUnitTest.createPartitionedRegion(regionName, "ln1", 1, 1));
+    vm0.invoke(() -> UpdateVersionDUnitTest.startSender( "ln1" ));
+    vm0.invoke(() -> UpdateVersionDUnitTest.waitForSenderRunningState( "ln1" 
));
+
+    //Site 2
+    Integer nyPort = (Integer)vm2.invoke(() -> 
UpdateVersionDUnitTest.createFirstRemoteLocator( 2, lnPort ));
+    Integer nyRecPort = (Integer) vm2.invoke(() -> 
UpdateVersionDUnitTest.createReceiver( nyPort ));
+
+    vm2.invoke(() -> 
UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1));
+    vm3.invoke(() -> UpdateVersionDUnitTest.createCache( nyPort));
+    vm3.invoke(() -> 
UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1));    
+    
+    final VersionTag tag = (VersionTag) vm0.invoke(new 
SerializableCallable("Update a single entry and get its version") {
+      
+      @Override
+      public Object call() throws CacheException {
+        Cache cache = CacheFactory.getAnyInstance();
+        Region region = cache.getRegion(regionName);
+        assertTrue(region instanceof PartitionedRegion);
+
+        region.put(key, "value-1");
+        Entry entry = region.getEntry(key);
+        assertTrue(entry instanceof EntrySnapshot);
+        RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+        VersionStamp stamp = regionEntry.getVersionStamp();
+
+        // Create a duplicate entry version tag from stamp with newer
+        // time-stamp.
+        VersionSource memberId = (VersionSource) 
cache.getDistributedSystem().getDistributedMember();
+        VersionTag tag = VersionTag.create(memberId);
+
+        int entryVersion = stamp.getEntryVersion()-1;
+        int dsid = stamp.getDistributedSystemId();
+        long time = System.currentTimeMillis();
+
+        tag.setEntryVersion(entryVersion);
+        tag.setDistributedSystemId(dsid);
+        tag.setVersionTimeStamp(time);
+        tag.setIsRemoteForTesting();
+
+        EntryEventImpl event = createNewEvent((PartitionedRegion) region, tag,
+            entry.getKey(), "value-2");
+
+        ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
+
+        // Verify the new stamp
+        entry = region.getEntry(key);
+        assertTrue(entry instanceof EntrySnapshot);
+        regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+        stamp = regionEntry.getVersionStamp();
+        assertEquals(
+            "Time stamp did NOT get updated by UPDATE_VERSION operation on 
LocalRegion",
+            time, stamp.getVersionTimeStamp());
+        assertEquals(++entryVersion, stamp.getEntryVersion());
+        assertEquals(dsid, stamp.getDistributedSystemId());
+
+        return stamp.asVersionTag();
+      }
+    });
+
+    VersionTag remoteTag = (VersionTag) vm3.invoke(new 
SerializableCallable("Get timestamp from remote site") {
+      
+      @Override
+      public Object call() throws Exception {
+        
+        Cache cache = CacheFactory.getAnyInstance();
+        final PartitionedRegion region = 
(PartitionedRegion)cache.getRegion(regionName);
+
+        // wait for entry to be received
+        WaitCriterion wc = new WaitCriterion() {
+          public boolean done() {
+            Entry<?,?> entry = null;
+            try {
+              entry = region.getDataStore().getEntryLocally(0, key, false, 
false);
+            } catch (EntryNotFoundException e) {
+              // expected
+            } catch (ForceReattemptException e) {
+              // expected
+            } catch (PRLocallyDestroyedException e) {
+              throw new RuntimeException("unexpected exception", e);
+            }
+            if (entry != null) {
+              LogWriterUtils.getLogWriter().info("found entry " + entry);
+            }
+            return (entry != null);
+          }
+
+          public String description() {
+            return "Expected "+key+" to be received on remote WAN site";
+          }
+        };
+        Wait.waitForCriterion(wc, 30000, 500, true);
+
+        wc = new WaitCriterion() {
+          public boolean done() {
+            Entry entry = region.getEntry(key);
+            assertTrue(entry instanceof EntrySnapshot);
+            RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+            return regionEntry.getVersionStamp().getVersionTimeStamp() == 
tag.getVersionTimeStamp();
+          }
+          public String description() {
+            return "waiting for timestamp to be updated";
+          }
+        };
+        Wait.waitForCriterion(wc, 30000, 500, true);
+
+        Entry entry = region.getEntry(key);
+        assertTrue("entry class is wrong: " + entry, entry instanceof 
EntrySnapshot);
+        RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+        VersionStamp stamp = regionEntry.getVersionStamp();
+
+        return stamp.asVersionTag();
+      }
+    });
+
+    assertEquals("Local and remote site have different timestamps", 
tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp());
+  }
+
+  @Test
+  public void testUpdateVersionAfterCreateWithSerialSenderOnDR() {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0); // server1 site1
+    VM vm1 = host.getVM(1); // server2 site1
+
+    VM vm2 = host.getVM(2); // server1 site2
+    VM vm3 = host.getVM(3); // server2 site2
+
+    final String key = "key-1";
+
+    // Site 1
+    Integer lnPort = (Integer)vm0.invoke(() -> 
UpdateVersionDUnitTest.createFirstLocatorWithDSId( 1 ));
+
+    vm0.invoke(() -> UpdateVersionDUnitTest.createCache( lnPort));
+    vm0.invoke(() -> UpdateVersionDUnitTest.createSender( "ln1", 2, false, 10, 
1, false, false, null, true ));
+    
+    vm0.invoke(() -> UpdateVersionDUnitTest.createReplicatedRegion(regionName, 
"ln1"));
+    vm0.invoke(() -> UpdateVersionDUnitTest.startSender( "ln1" ));
+    vm0.invoke(() -> UpdateVersionDUnitTest.waitForSenderRunningState( "ln1" 
));
+
+    //Site 2
+    Integer nyPort = (Integer)vm2.invoke(() -> 
UpdateVersionDUnitTest.createFirstRemoteLocator( 2, lnPort ));
+    Integer nyRecPort = (Integer) vm2.invoke(() -> 
UpdateVersionDUnitTest.createReceiver( nyPort ));
+
+    vm2.invoke(() -> UpdateVersionDUnitTest.createReplicatedRegion(regionName, 
""));
+    vm3.invoke(() -> UpdateVersionDUnitTest.createCache( nyPort ));
+    vm3.invoke(() -> UpdateVersionDUnitTest.createReplicatedRegion(regionName, 
""));    
+    
+    final VersionTag tag = (VersionTag) vm0.invoke(new 
SerializableCallable("Update a single entry and get its version") {
+      
+      @Override
+      public Object call() throws CacheException {
+        Cache cache = CacheFactory.getAnyInstance();
+        Region region = cache.getRegion(regionName);
+        assertTrue(region instanceof DistributedRegion);
+
+        region.put(key, "value-1");
+        Entry entry = region.getEntry(key);
+        assertTrue(entry instanceof NonTXEntry);
+        RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
+
+        VersionStamp stamp = regionEntry.getVersionStamp();
+
+        // Create a duplicate entry version tag from stamp with newer
+        // time-stamp.
+        VersionSource memberId = (VersionSource) 
cache.getDistributedSystem().getDistributedMember();
+        VersionTag tag = VersionTag.create(memberId);
+
+        int entryVersion = stamp.getEntryVersion()-1;
+        int dsid = stamp.getDistributedSystemId();
+        long time = System.currentTimeMillis();
+
+        tag.setEntryVersion(entryVersion);
+        tag.setDistributedSystemId(dsid);
+        tag.setVersionTimeStamp(time);
+        tag.setIsRemoteForTesting();
+
+        EntryEventImpl event = createNewEvent((DistributedRegion) region, tag,
+            entry.getKey(), "value-2");
+
+        ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
+
+        // Verify the new stamp
+        entry = region.getEntry(key);
+        assertTrue(entry instanceof NonTXEntry);
+        regionEntry = ((NonTXEntry) entry).getRegionEntry();
+
+        stamp = regionEntry.getVersionStamp();
+        assertEquals(
+            "Time stamp did NOT get updated by UPDATE_VERSION operation on 
LocalRegion",
+            time, stamp.getVersionTimeStamp());
+        assertEquals(entryVersion+1, stamp.getEntryVersion());
+        assertEquals(dsid, stamp.getDistributedSystemId());
+
+        return stamp.asVersionTag();
+      }
+    });
+
+    VersionTag remoteTag = (VersionTag) vm3.invoke(new 
SerializableCallable("Get timestamp from remote site") {
+      
+      @Override
+      public Object call() throws Exception {
+        
+        Cache cache = CacheFactory.getAnyInstance();
+        final Region region = cache.getRegion(regionName);
+
+        // wait for entry to be received
+        WaitCriterion wc = new WaitCriterion() {
+          public boolean done() {
+            return (region.getEntry(key) != null);
+          }
+
+          public String description() {
+            return "Expected key-1 to be received on remote WAN site";
+          }
+        };
+        Wait.waitForCriterion(wc, 30000, 500, true);
+
+        wc = new WaitCriterion() {
+          public boolean done() {
+            Entry entry = region.getEntry(key);
+            assertTrue(entry instanceof NonTXEntry);
+            RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
+            return regionEntry.getVersionStamp().getVersionTimeStamp() == 
tag.getVersionTimeStamp();
+          }
+          public String description() {
+            return "waiting for timestamp to be updated";
+          }
+        };
+        Wait.waitForCriterion(wc, 30000, 500, true);
+
+        Entry entry = region.getEntry(key);
+        assertTrue(entry instanceof NonTXEntry);
+        RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
+
+        VersionStamp stamp = regionEntry.getVersionStamp();
+
+        return stamp.asVersionTag();
+      }
+    });
+
+    assertEquals("Local and remote site have different timestamps", 
tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp());
+  }
+
+  @Test
+  public void testUpdateVersionAfterCreateWithParallelSender() {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0); // server1 site1
+    VM vm1 = host.getVM(1); // server2 site1
+
+    VM vm2 = host.getVM(2); // server1 site2
+    VM vm3 = host.getVM(3); // server2 site2
+
+    // Site 1
+    Integer lnPort = (Integer)vm0.invoke(() -> 
UpdateVersionDUnitTest.createFirstLocatorWithDSId( 1 ));
+
+    final String key = "key-1";
+
+    vm0.invoke(() -> UpdateVersionDUnitTest.createCache( lnPort));
+    vm0.invoke(() -> UpdateVersionDUnitTest.createSender( "ln1", 2, true, 10, 
1, false, false, null, true ));
+    
+    vm0.invoke(() -> 
UpdateVersionDUnitTest.createPartitionedRegion(regionName, "ln1", 1, 1));
+    vm0.invoke(() -> UpdateVersionDUnitTest.startSender( "ln1" ));
+    vm0.invoke(() -> UpdateVersionDUnitTest.waitForSenderRunningState( "ln1" 
));
+    
+    //Site 2
+    Integer nyPort = (Integer)vm2.invoke(() -> 
UpdateVersionDUnitTest.createFirstRemoteLocator( 2, lnPort ));
+    Integer nyRecPort = (Integer) vm2.invoke(() -> 
UpdateVersionDUnitTest.createReceiver( nyPort ));
+
+    vm2.invoke(() -> 
UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1));
+
+    vm3.invoke(() -> UpdateVersionDUnitTest.createCache( nyPort));
+    vm3.invoke(() -> 
UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1));
+    
+    final VersionTag tag = (VersionTag) vm0.invoke(new 
SerializableCallable("Put a single entry and get its version") {
+      
+      @Override
+      public Object call() throws CacheException {
+        Cache cache = CacheFactory.getAnyInstance();
+        Region region = cache.getRegion(regionName);
+        assertTrue(region instanceof PartitionedRegion);
+
+        region.put(key, "value-1");
+        Entry entry = region.getEntry(key);
+        assertTrue(entry instanceof EntrySnapshot);
+        RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+        VersionStamp stamp = regionEntry.getVersionStamp();
+
+        // Create a duplicate entry version tag from stamp with newer
+        // time-stamp.
+        VersionSource memberId = (VersionSource) 
cache.getDistributedSystem().getDistributedMember();
+        VersionTag tag = VersionTag.create(memberId);
+
+        int entryVersion = stamp.getEntryVersion()-1;
+        int dsid = stamp.getDistributedSystemId();
+        long time = System.currentTimeMillis();
+
+        tag.setEntryVersion(entryVersion);
+        tag.setDistributedSystemId(dsid);
+        tag.setVersionTimeStamp(time);
+        tag.setIsRemoteForTesting();
+
+        EntryEventImpl event = createNewEvent((PartitionedRegion) region, tag,
+            entry.getKey(), "value-2");
+
+        ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
+
+        // Verify the new stamp
+        entry = region.getEntry(key);
+        assertTrue(entry instanceof EntrySnapshot);
+        regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+        stamp = regionEntry.getVersionStamp();
+        assertEquals(
+            "Time stamp did NOT get updated by UPDATE_VERSION operation on 
LocalRegion",
+            time, stamp.getVersionTimeStamp());
+        assertEquals(++entryVersion, stamp.getEntryVersion());
+        assertEquals(dsid, stamp.getDistributedSystemId());
+
+        return stamp.asVersionTag();
+      }
+    });
+
+    VersionTag remoteTag = (VersionTag) vm3.invoke(new 
SerializableCallable("Get timestamp from remote site") {
+      
+      @Override
+      public Object call() throws Exception {
+        
+        Cache cache = CacheFactory.getAnyInstance();
+        final PartitionedRegion region = 
(PartitionedRegion)cache.getRegion(regionName);
+
+        // wait for entry to be received
+        WaitCriterion wc = new WaitCriterion() {
+          public boolean done() {
+            Entry<?,?> entry = null;
+            try {
+              entry = region.getDataStore().getEntryLocally(0, key, false, 
false);
+            } catch (EntryNotFoundException e) {
+              // expected
+            } catch (ForceReattemptException e) {
+              // expected
+            } catch (PRLocallyDestroyedException e) {
+              throw new RuntimeException("unexpected exception", e);
+            }
+            if (entry != null) {
+              LogWriterUtils.getLogWriter().info("found entry " + entry);
+            }
+            return (entry != null);
+          }
+
+          public String description() {
+            return "Expected key-1 to be received on remote WAN site";
+          }
+        };
+        Wait.waitForCriterion(wc, 30000, 500, true);
+
+        wc = new WaitCriterion() {
+          public boolean done() {
+            Entry entry = region.getEntry(key);
+            assertTrue(entry instanceof EntrySnapshot);
+            RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+            return regionEntry.getVersionStamp().getVersionTimeStamp() == 
tag.getVersionTimeStamp();
+          }
+          public String description() {
+            return "waiting for timestamp to be updated";
+          }
+        };
+        Wait.waitForCriterion(wc, 30000, 500, true);
+
+        Entry entry = region.getEntry(key);
+        assertTrue(entry instanceof EntrySnapshot);
+        RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+        VersionStamp stamp = regionEntry.getVersionStamp();
+
+        return stamp.asVersionTag();
+      }
+    });
+
+    assertEquals("Local and remote site have different timestamps", 
tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp());
+  }
+
+  @Test
+  public void testUpdateVersionAfterCreateWithConcurrentSerialSender() {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0); // server1 site1
+    VM vm1 = host.getVM(1); // server2 site1
+
+    VM vm2 = host.getVM(2); // server1 site2
+    VM vm3 = host.getVM(3); // server2 site2
+
+    // Site 1
+    Integer lnPort = (Integer)vm0.invoke(() -> 
UpdateVersionDUnitTest.createFirstLocatorWithDSId( 1 ));
+
+    final String key = "key-1";
+
+    vm0.invoke(() -> UpdateVersionDUnitTest.createCache( lnPort ));
+    vm0.invoke(() -> UpdateVersionDUnitTest.createConcurrentSender( "ln1", 2, 
false, 10, 2, false, false, null, true, 2 ));
+    
+    vm0.invoke(() -> 
UpdateVersionDUnitTest.createPartitionedRegion(regionName, "ln1", 1, 1));
+    vm0.invoke(() -> UpdateVersionDUnitTest.startSender( "ln1" ));
+    vm0.invoke(() -> UpdateVersionDUnitTest.waitForSenderRunningState( "ln1" 
));
+    
+    //Site 2
+    Integer nyPort = (Integer)vm2.invoke(() -> 
UpdateVersionDUnitTest.createFirstRemoteLocator( 2, lnPort ));
+    Integer nyRecPort = (Integer) vm2.invoke(() -> 
UpdateVersionDUnitTest.createReceiver( nyPort ));
+
+    vm2.invoke(() -> 
UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1));
+
+    vm3.invoke(() -> UpdateVersionDUnitTest.createCache( nyPort ));
+    vm3.invoke(() -> 
UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1));    
+    
+    final VersionTag tag = (VersionTag) vm0.invoke(new 
SerializableCallable("Put a single entry and get its version") {
+      
+      @Override
+      public Object call() throws CacheException {
+        Cache cache = CacheFactory.getAnyInstance();
+        Region region = cache.getRegion(regionName);
+        assertTrue(region instanceof PartitionedRegion);
+
+        region.put(key, "value-1");
+        Entry entry = region.getEntry(key);
+        assertTrue(entry instanceof EntrySnapshot);
+        RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+        VersionStamp stamp = regionEntry.getVersionStamp();
+
+        // Create a duplicate entry version tag from stamp with newer
+        // time-stamp.
+        VersionSource memberId = (VersionSource) 
cache.getDistributedSystem().getDistributedMember();
+        VersionTag tag = VersionTag.create(memberId);
+
+        int entryVersion = stamp.getEntryVersion()-1;
+        int dsid = stamp.getDistributedSystemId();
+        long time = System.currentTimeMillis();
+
+        tag.setEntryVersion(entryVersion);
+        tag.setDistributedSystemId(dsid);
+        tag.setVersionTimeStamp(time);
+        tag.setIsRemoteForTesting();
+
+        EntryEventImpl event = createNewEvent((PartitionedRegion) region, tag,
+            entry.getKey(), "value-2");
+
+        ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
+
+        // Verify the new stamp
+        entry = region.getEntry(key);
+        assertTrue(entry instanceof EntrySnapshot);
+        regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+        stamp = regionEntry.getVersionStamp();
+        assertEquals(
+            "Time stamp did NOT get updated by UPDATE_VERSION operation on 
LocalRegion",
+            time, stamp.getVersionTimeStamp());
+        assertEquals(++entryVersion, stamp.getEntryVersion());
+        assertEquals(dsid, stamp.getDistributedSystemId());
+
+        return stamp.asVersionTag();
+      }
+    });
+
+    VersionTag remoteTag = (VersionTag) vm3.invoke(new 
SerializableCallable("Get timestamp from remote site") {
+      
+      @Override
+      public Object call() throws Exception {
+        
+        Cache cache = CacheFactory.getAnyInstance();
+        final PartitionedRegion region = 
(PartitionedRegion)cache.getRegion(regionName);
+
+        // wait for entry to be received
+        WaitCriterion wc = new WaitCriterion() {
+          public boolean done() {
+            Entry<?,?> entry = null;
+            try {
+              entry = region.getDataStore().getEntryLocally(0, key, false, 
false);
+            } catch (EntryNotFoundException e) {
+              // expected
+            } catch (ForceReattemptException e) {
+              // expected
+            } catch (PRLocallyDestroyedException e) {
+              throw new RuntimeException("unexpected exception", e);
+            }
+            if (entry != null) {
+              LogWriterUtils.getLogWriter().info("found entry " + entry);
+            }
+            return (entry != null);
+          }
+
+          public String description() {
+            return "Expected key-1 to be received on remote WAN site";
+          }
+        };
+        Wait.waitForCriterion(wc, 30000, 500, true);
+
+        wc = new WaitCriterion() {
+          public boolean done() {
+            Entry entry = region.getEntry(key);
+            assertTrue(entry instanceof EntrySnapshot);
+            RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+            return regionEntry.getVersionStamp().getVersionTimeStamp() == 
tag.getVersionTimeStamp();
+          }
+          public String description() {
+            return "waiting for timestamp to be updated";
+          }
+        };
+        Wait.waitForCriterion(wc, 30000, 500, true);
+
+        Entry entry = region.getEntry(key);
+        assertTrue(entry instanceof EntrySnapshot);
+        RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+        VersionStamp stamp = regionEntry.getVersionStamp();
+
+        return stamp.asVersionTag();
+      }
+    });
+
+    assertEquals("Local and remote site have different timestamps", 
tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp());
+  }
+
+  private VersionTagHolder createNewEvent(LocalRegion region, VersionTag tag, 
Object key, Object value) {
+    VersionTagHolder updateEvent = new VersionTagHolder(tag);
+    updateEvent.setOperation(Operation.UPDATE);
+    updateEvent.setRegion(region);
+    if (region instanceof PartitionedRegion) {
+      updateEvent.setKeyInfo(((PartitionedRegion)region).getKeyInfo(key));
+    } else {
+      updateEvent.setKeyInfo(new KeyInfo(key, value, null));
+    }
+    updateEvent.setNewValue(value);
+    updateEvent.setGenerateCallbacks(true);
+    updateEvent.distributedMember = region.getSystem().getDistributedMember();
+    updateEvent.setNewEventId(region.getSystem());
+    return updateEvent;
+  }
+
+  /*
+   * Helper Methods
+   */
+
+  private static void createCache(Integer locPort) {
+    UpdateVersionDUnitTest test = new UpdateVersionDUnitTest();
+    Properties props = test.getDistributedSystemProperties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(LOCATORS, "localhost[" + locPort + "]");
+    props.setProperty(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
+    props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
+    props.setProperty(USE_CLUSTER_CONFIGURATION, "false");
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds); 
+    IgnoredException ex = new IgnoredException("could not get remote locator 
information for remote site");
+    cache.getLogger().info(ex.getAddMessage());
+    expectedExceptions.add(ex);
+    ex = new IgnoredException("Pool ln1 is not available");
+    cache.getLogger().info(ex.getAddMessage());
+    expectedExceptions.add(ex);
+  }
+  
+  private static void closeCache() {
+    if (cache != null && !cache.isClosed()) {
+      for (IgnoredException expectedException: expectedExceptions) {
+        cache.getLogger().info(expectedException.getRemoveMessage());
+      }
+      expectedExceptions.clear();
+      cache.getDistributedSystem().disconnect();
+      cache.close();
+    }
+    cache = null;
+  }
+
+  public static void createSender(String dsName, int remoteDsId,
+      boolean isParallel, Integer maxMemory, Integer batchSize,
+      boolean isConflation, boolean isPersistent, GatewayEventFilter filter,
+      boolean isManualStart) {
+    File persistentDirectory = new File(dsName + "_disk_"
+        + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+    persistentDirectory.mkdir();
+    DiskStoreFactory dsf = cache.createDiskStoreFactory();
+    File[] dirs1 = new File[] { persistentDirectory };
+    if (isParallel) {
+      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+      gateway.setParallel(true);
+      gateway.setMaximumQueueMemory(maxMemory);
+      gateway.setBatchSize(batchSize);
+      gateway.setManualStart(isManualStart);
+      ((InternalGatewaySenderFactory) gateway)
+          .setLocatorDiscoveryCallback(new MyLocatorCallback());
+      if (filter != null) {
+        gateway.addGatewayEventFilter(filter);
+      }
+      if (isPersistent) {
+        gateway.setPersistenceEnabled(true);
+        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+            .getName());
+      } else {
+        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+        gateway.setDiskStoreName(store.getName());
+      }
+      gateway.setBatchConflationEnabled(isConflation);
+      gateway.create(dsName, remoteDsId);
+
+    } else {
+      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+      gateway.setMaximumQueueMemory(maxMemory);
+      gateway.setBatchSize(batchSize);
+      gateway.setManualStart(isManualStart);
+      ((InternalGatewaySenderFactory) gateway)
+          .setLocatorDiscoveryCallback(new MyLocatorCallback());
+      if (filter != null) {
+        gateway.addGatewayEventFilter(filter);
+      }
+      gateway.setBatchConflationEnabled(isConflation);
+      if (isPersistent) {
+        gateway.setPersistenceEnabled(true);
+        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+            .getName());
+      } else {
+        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+        gateway.setDiskStoreName(store.getName());
+      }
+      gateway.create(dsName, remoteDsId);
+    }
+  }
+
+  public static void createPartitionedRegion(String regionName, String 
senderIds, Integer redundantCopies, Integer totalNumBuckets){
+    AttributesFactory fact = new AttributesFactory();
+    if(senderIds!= null){
+      StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+      while (tokenizer.hasMoreTokens()){
+        String senderId = tokenizer.nextToken();
+        fact.addGatewaySenderId(senderId);
+      }
+    }
+    PartitionAttributesFactory pFact = new PartitionAttributesFactory();
+    pFact.setTotalNumBuckets(totalNumBuckets);
+    pFact.setRedundantCopies(redundantCopies);
+    pFact.setRecoveryDelay(0);
+    fact.setPartitionAttributes(pFact.create());
+    Region r = cache.createRegionFactory(fact.create()).create(regionName);
+    assertNotNull(r);
+  }
+
+  public static void createReplicatedRegion(String regionName, String 
senderIds){
+    AttributesFactory fact = new AttributesFactory();
+    if(senderIds!= null){
+      StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+      while (tokenizer.hasMoreTokens()){
+        String senderId = tokenizer.nextToken();
+        fact.addGatewaySenderId(senderId);
+      }
+    }
+    fact.setDataPolicy(DataPolicy.REPLICATE);
+    fact.setScope(Scope.DISTRIBUTED_ACK);
+    Region r = cache.createRegionFactory(fact.create()).create(regionName);
+    assertNotNull(r);
+  }
+
+  public static void waitForSenderRunningState(String senderId){
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    final GatewaySender sender = getGatewaySenderById(senders, senderId);
+    
+    WaitCriterion wc = new WaitCriterion() {
+      public boolean done() {
+        if (sender != null && sender.isRunning()) {
+          return true;
+        }
+        return false;
+      }
+
+      public String description() {
+        return "Expected sender isRunning state to be true but is false";
+      }
+    };
+    Wait.waitForCriterion(wc, 300000, 500, true);
+  }
+
+  public static Integer createFirstRemoteLocator(int dsId, int remoteLocPort) {
+    UpdateVersionDUnitTest test = new UpdateVersionDUnitTest();
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = test.getDistributedSystemProperties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId);
+    props.setProperty(LOCATORS, "localhost[" + port + "]");
+    props.setProperty(START_LOCATOR, "localhost[" + port + 
"],server=true,peer=true,hostname-for-clients=localhost");
+    props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
+    props.setProperty(USE_CLUSTER_CONFIGURATION, "false");
+    props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
+    test.getSystem(props);
+    return port;
+  }
+
+  public static void createConcurrentSender(String dsName, int remoteDsId,
+      boolean isParallel, Integer maxMemory,
+      Integer batchSize, boolean isConflation, boolean isPersistent,
+      GatewayEventFilter filter, boolean isManualStart, int concurrencyLevel) {
+    File persistentDirectory = new File(dsName 
+"_disk_"+System.currentTimeMillis()+"_" + VM.getCurrentVMNum());
+    persistentDirectory.mkdir();
+    DiskStoreFactory dsf = cache.createDiskStoreFactory();
+    File [] dirs1 = new File[] {persistentDirectory};
+    
+    if(isParallel) {
+      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+      gateway.setParallel(true);
+      gateway.setMaximumQueueMemory(maxMemory);
+      gateway.setBatchSize(batchSize);
+      gateway.setManualStart(isManualStart);
+      ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new 
MyLocatorCallback());
+      if (filter != null) {
+        gateway.addGatewayEventFilter(filter);
+      }
+      if(isPersistent) {
+        gateway.setPersistenceEnabled(true);
+        
gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
+      }
+      else {
+        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+        gateway.setDiskStoreName(store.getName());
+      }
+      gateway.setBatchConflationEnabled(isConflation);
+      gateway.create(dsName, remoteDsId);
+      
+    }else {
+      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+      gateway.setMaximumQueueMemory(maxMemory);
+      gateway.setBatchSize(batchSize);
+      gateway.setManualStart(isManualStart);
+      ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new 
MyLocatorCallback());
+      if (filter != null) {
+        gateway.addGatewayEventFilter(filter);
+      }
+      gateway.setBatchConflationEnabled(isConflation);
+      if(isPersistent) {
+        gateway.setPersistenceEnabled(true);
+        
gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
+      }
+      else {
+        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+        gateway.setDiskStoreName(store.getName());
+      }
+      gateway.setDispatcherThreads(concurrencyLevel);
+      gateway.create(dsName, remoteDsId);
+    }
+  }
+
+  public static int createReceiver(int locPort) {
+    UpdateVersionDUnitTest test = new UpdateVersionDUnitTest();
+    Properties props = test.getDistributedSystemProperties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(LOCATORS, "localhost[" + locPort
+        + "]");
+
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);    
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    fact.setStartPort(port);
+    fact.setEndPort(port);
+    GatewayReceiver receiver = fact.create();
+    try {
+      receiver.start();
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail("Test " + test.getName() + " failed to start GatewayReceiver on 
port " + port);
+    }
+    return port;
+  }
+
+  public static void startSender(String senderId){
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for(GatewaySender s : senders){
+      if(s.getId().equals(senderId)){
+        sender = s;
+        break;
+      }
+    }
+    sender.start();
+  }
+
+  protected static class MyLocatorCallback extends 
LocatorDiscoveryCallbackAdapter {
+
+    private final Set discoveredLocators = new HashSet();
+
+    private final Set removedLocators = new HashSet();
+
+    @Override
+    public synchronized void locatorsDiscovered(List locators) {
+      discoveredLocators.addAll(locators);
+      notifyAll();
+    }
+
+    @Override
+    public synchronized void locatorsRemoved(List locators) {
+      removedLocators.addAll(locators);
+      notifyAll();
+    }
+
+    public boolean waitForDiscovery(InetSocketAddress locator, long time)
+        throws InterruptedException {
+      return waitFor(discoveredLocators, locator, time);
+    }
+
+    public boolean waitForRemove(InetSocketAddress locator, long time)
+        throws InterruptedException {
+      return waitFor(removedLocators, locator, time);
+    }
+
+    private synchronized boolean waitFor(Set set, InetSocketAddress locator,
+        long time) throws InterruptedException {
+      long remaining = time;
+      long endTime = System.currentTimeMillis() + time;
+      while (!set.contains(locator) && remaining >= 0) {
+        wait(remaining);
+        remaining = endTime - System.currentTimeMillis();
+      }
+      return set.contains(locator);
+    }
+
+    public synchronized Set getDiscovered() {
+      return new HashSet(discoveredLocators);
+    }
+
+    public synchronized Set getRemoved() {
+      return new HashSet(removedLocators);
+    }
+  }
+
+  private static GatewaySender getGatewaySenderById(Set<GatewaySender> 
senders, String senderId) {
+    for(GatewaySender s : senders){
+      if(s.getId().equals(senderId)){
+        return s;
+      }
+    }
+    //if none of the senders matches with the supplied senderId, return null
+    return null;
+  }
+
+  public static Integer createFirstLocatorWithDSId(int dsId) {
+    UpdateVersionDUnitTest test = new UpdateVersionDUnitTest();
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = test.getDistributedSystemProperties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId);
+    props.setProperty(LOCATORS, "localhost[" + port + "]");
+    props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
+    props.setProperty(USE_CLUSTER_CONFIGURATION, "false");
+    props.setProperty(START_LOCATOR, "localhost[" + port + 
"],server=true,peer=true,hostname-for-clients=localhost");
+    test.getSystem(props);
+    return port;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java
new file mode 100755
index 0000000..96d441c
--- /dev/null
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan;
+
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.client.Pool;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.server.ClientSubscriptionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.cache.CacheServerImpl;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.ha.HAContainerRegion;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+import com.gemstone.gemfire.test.junit.categories.FlakyTest;
+
+@Category(DistributedTest.class)
+public class CacheClientNotifierDUnitTest extends WANTestBase {
+
+  private static final int NUM_KEYS = 10;
+  
+  private int createCacheServerWithCSC(VM vm, final boolean withCSC, final int 
capacity,
+      final String policy, final String diskStoreName) {
+    final int serverPort = 
AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+
+    SerializableRunnable createCacheServer = new SerializableRunnable() {
+      @Override
+      public void run() throws Exception {
+        CacheServerImpl server = (CacheServerImpl)cache.addCacheServer();
+        server.setPort(serverPort);
+        if (withCSC) {
+          if (diskStoreName != null) {
+            DiskStore ds = cache.findDiskStore(diskStoreName);
+            if(ds == null) {
+              ds = cache.createDiskStoreFactory().create(diskStoreName);
+            }
+          }
+          ClientSubscriptionConfig csc = server.getClientSubscriptionConfig();
+          csc.setCapacity(capacity);
+          csc.setEvictionPolicy(policy);
+          csc.setDiskStoreName(diskStoreName);
+          server.setHostnameForClients("localhost");
+          //server.setGroups(new String[]{"serv"});
+        }
+        try {
+          server.start();
+        } catch (IOException e) {
+          com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start server 
", e);
+        }
+      }
+    };
+    vm.invoke(createCacheServer);
+    return serverPort;
+  }
+
+  private void checkCacheServer(VM vm, final int serverPort, final boolean 
withCSC, final int capacity) {
+    SerializableRunnable checkCacheServer = new SerializableRunnable() {
+
+      @Override
+      public void run() throws Exception {
+        List<CacheServer> cacheServers = 
((GemFireCacheImpl)cache).getCacheServersAndGatewayReceiver();
+        CacheServerImpl server = null;
+        for (CacheServer cs:cacheServers) {
+          if (cs.getPort() == serverPort) {
+            server = (CacheServerImpl)cs;
+            break;
+          }
+        }
+        assertNotNull(server);
+        CacheClientNotifier ccn = 
server.getAcceptor().getCacheClientNotifier();
+        HAContainerRegion haContainer = 
(HAContainerRegion)ccn.getHaContainer();
+        if (server.getAcceptor().isGatewayReceiver()) {
+          assertNull(haContainer);
+          return;
+        }
+        Region internalRegion = haContainer.getMapForTest();
+        RegionAttributes ra = internalRegion.getAttributes();
+        EvictionAttributes ea = ra.getEvictionAttributes();
+        if (withCSC) {
+          assertNotNull(ea);
+          assertEquals(capacity, ea.getMaximum());
+          assertEquals(EvictionAction.OVERFLOW_TO_DISK, ea.getAction());
+        } else {
+          assertNull(ea);
+        }
+      }
+    };
+    vm.invoke(checkCacheServer);
+  }
+
+  public static void closeACacheServer(final int serverPort) {
+    List<CacheServer> cacheServers = cache.getCacheServers();
+    CacheServerImpl server = null;
+    for (CacheServer cs:cacheServers) {
+      if (cs.getPort() == serverPort) {
+        server = (CacheServerImpl)cs;
+        break;
+      }
+    }
+    assertNotNull(server);
+    server.stop();
+  }
+
+  private void verifyRegionSize(VM vm, final int expect) {
+    SerializableRunnable verifyRegionSize = new SerializableRunnable() {
+      @Override
+      public void run() throws Exception {
+        final Region region = cache.getRegion(getTestMethodName() + "_PR");
+
+        Wait.waitForCriterion(new WaitCriterion() {
+          public boolean done() {
+            return region.size() == expect; 
+          }
+          public String description() {
+            return null;
+          }
+        }, 60000, 100, false);
+        assertEquals(expect, region.size());
+      }
+    };
+    vm.invoke(verifyRegionSize);
+  }
+  
+  /**
+   * The test will start several cache servers, including gateway receivers.
+   * Shutdown them and verify the CacheClientNotifier for each server is 
correct
+   */
+  @Test
+  public void testNormalClient2MultipleCacheServer() throws Exception {
+    doMultipleCacheServer(false);
+  }
+
+  public void doMultipleCacheServer(boolean durable) throws Exception {
+    /* test scenario: */
+    /* create 1 GatewaySender on vm0 */
+    /* create 1 GatewayReceiver on vm1 */
+    /* create 2 cache servers on vm1, one with overflow. */
+    /* verify if the cache server2 still has the overflow attributes */
+    /* create 1 cache client1 on vm2 to register interest on cache server1 */
+    /* create 1 cache client2 on vm3 to register interest on cache server1 */
+    /* do some puts to GatewaySender on vm0 */
+    
+    // create sender at ln
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    
+    // create receiver and cache servers will be at ny
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+    vm1.invoke(() -> WANTestBase.createCache( nyPort ));
+    int receiverPort = vm1.invoke(() -> WANTestBase.createReceiver());
+    checkCacheServer(vm1, receiverPort, false, 0);
+    
+    // create PR for receiver
+    vm1.invoke(() -> WANTestBase.createPersistentPartitionedRegion( 
getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    
+    // create cache server1 with overflow
+    int serverPort = createCacheServerWithCSC(vm1, true, 3, "entry", 
"DEFAULT");
+    checkCacheServer(vm1, serverPort, true, 3);
+    
+    // create cache server 2
+    final int serverPort2 = createCacheServerWithCSC(vm1, false, 0, null, 
null);
+    // Currently, only the first cache server's overflow attributes will take 
effect
+    // It will be enhanced in GEODE-1102
+    checkCacheServer(vm1, serverPort2, true, 3);
+    
LogService.getLogger().info("receiverPort="+receiverPort+",serverPort="+serverPort+",serverPort2="+serverPort2);
+    
+    vm2.invoke(() -> createClientWithLocator(nyPort, "localhost", 
getTestMethodName() + "_PR", "123", durable));
+    vm3.invoke(() -> createClientWithLocator(nyPort, "localhost", 
getTestMethodName() + "_PR", "124", durable));
+
+    vm0.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm0.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 400, 
false, false, null, true ));
+    vm0.invoke(() -> WANTestBase.createPersistentPartitionedRegion( 
getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm0.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm0.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", NUM_KEYS 
));
+    
+    /* verify */
+    verifyRegionSize(vm0, NUM_KEYS);
+    verifyRegionSize(vm1, NUM_KEYS);
+    verifyRegionSize(vm3, NUM_KEYS);
+    verifyRegionSize(vm2, NUM_KEYS);
+
+    // close a cache server, then re-test
+    vm1.invoke(() -> closeACacheServer(serverPort2));
+
+    vm0.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 
NUM_KEYS*2 ));
+
+    /* verify */
+    verifyRegionSize(vm0, NUM_KEYS*2);
+    verifyRegionSize(vm1, NUM_KEYS*2);
+    verifyRegionSize(vm3, NUM_KEYS*2);
+    verifyRegionSize(vm2, NUM_KEYS*2);
+    
+    disconnectAllFromDS();
+  }
+
+  public static void createClientWithLocator(int port0,String host,
+      String regionName, String clientId, boolean isDurable) {
+    WANTestBase test = new WANTestBase();
+    Properties props = test.getDistributedSystemProperties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(LOCATORS, "");
+    if (isDurable) {
+      props.setProperty(DURABLE_CLIENT_ID, clientId);
+      props.setProperty(DURABLE_CLIENT_TIMEOUT, "" + 200);
+    }
+
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+
+    assertNotNull(cache);
+    CacheServerTestUtil.disableShufflingOfEndpoints();
+    Pool p;
+    try {
+      p = PoolManager.createFactory().addLocator(host, port0)
+          .setPingInterval(250).setSubscriptionEnabled(true)
+          .setSubscriptionRedundancy(-1).setReadTimeout(2000)
+          .setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10)
+          .setRetryAttempts(3).create(regionName);
+    } finally {
+      CacheServerTestUtil.enableShufflingOfEndpoints();
+    }
+
+    AttributesFactory factory = new AttributesFactory();
+    factory.setPoolName(p.getName());
+    factory.setDataPolicy(DataPolicy.NORMAL);
+    RegionAttributes attrs = factory.create();
+    region = cache.createRegion(regionName, attrs);
+    region.registerInterest("ALL_KEYS");
+    assertNotNull(region);
+    if (isDurable) {
+      cache.readyForEvents();
+    }
+    LogWriterUtils.getLogWriter().info(
+        "Distributed Region " + regionName + " created Successfully :"
+            + region.toString() + " in a "+(isDurable?"durable":"")+" client");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java
new file mode 100755
index 0000000..1d4e947
--- /dev/null
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.CacheServerImpl;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverAdapter;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.ha.HAContainerRegion;
+import com.gemstone.gemfire.test.junit.categories.FlakyTest;
+import com.jayway.awaitility.Awaitility;
+
+@Category(DistributedTest.class)
+public class Simple2CacheServerDUnitTest extends WANTestBase {
+  private static final int NUM_KEYS = 10;
+  static int afterPrimaryCount = 0;
+  static int afterProxyReinitialized = 0;
+  
+  public Simple2CacheServerDUnitTest() {
+    super();
+  }
+  
+  @Test
+  public void testNormalClient2MultipleCacheServer() throws Exception {
+    doMultipleCacheServer(false);
+  }
+  
+  public void doMultipleCacheServer(boolean durable) throws Exception {
+    Integer lnPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    vm1.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm1.invoke(() -> WANTestBase.createPersistentPartitionedRegion( 
getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    int serverPort = vm1.invoke(() -> WANTestBase.createCacheServer());
+    int serverPort2 = vm1.invoke(() -> WANTestBase.createCacheServer());
+
+    if (durable) {
+      vm1.invoke(() -> setCacheClientProxyTestHook());
+    } else {
+      vm2.invoke(() -> setClientServerObserver());
+    }
+    vm2.invoke(() -> 
CacheClientNotifierDUnitTest.createClientWithLocator(lnPort, "localhost", 
getTestMethodName() + "_PR" , "123", durable));
+
+    vm0.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm0.invoke(() -> WANTestBase.createPersistentPartitionedRegion( 
getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    int serverPort3 = vm0.invoke(() -> WANTestBase.createCacheServer());
+    
+    if (durable) {
+      vm1.invoke(() -> checkResultAndUnsetCacheClientProxyTestHook());
+    } else {
+      vm2.invoke(() -> checkResultAndUnsetClientServerObserver());
+    }
+    Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> { return 
checkProxyIsPrimary(vm0) || checkProxyIsPrimary(vm1); });
+    
+    // close the current primary cache server, then re-test
+    int serverPortAtVM1 = vm1.invoke(()-> findCacheServerForPrimaryProxy());
+    if (serverPortAtVM1 != 0) {
+      vm1.invoke(()-> 
CacheClientNotifierDUnitTest.closeACacheServer(serverPortAtVM1));
+      LogService.getLogger().info("Closed cache server on 
vm1:"+serverPortAtVM1);
+      Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> { return 
checkProxyIsPrimary(vm0) || checkProxyIsPrimary(vm1); });
+    } else {
+      vm0.invoke(()-> 
CacheClientNotifierDUnitTest.closeACacheServer(serverPort3));
+      LogService.getLogger().info("Closed cache server on vm0:"+serverPort3);
+      assertTrue(checkProxyIsPrimary(vm1));
+    }
+    disconnectAllFromDS();
+  }
+  
+  private static int findCacheServerForPrimaryProxy() {
+    List<CacheServer> cacheServers = 
((GemFireCacheImpl)cache).getCacheServers();
+    CacheServerImpl server = null;
+    for (CacheServer cs:cacheServers) {
+      server = (CacheServerImpl)cs;
+      long acceptorId = server.getAcceptor().getAcceptorId();
+      for (CacheClientProxy 
proxy:CacheClientNotifier.getInstance().getClientProxies()) {
+        if (proxy.isPrimary() == false) {
+          continue;
+        }
+        if (proxy.getAcceptorId() == acceptorId) {
+          LogService.getLogger().info("Found cache server "+server+" for the 
primary proxy "+proxy);
+          return server.getPort();
+        }
+      }
+    }
+    return 0;
+  }
+  
+  public static void setClientServerObserver()
+  {
+    PoolImpl.AFTER_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG = true;
+    ClientServerObserverHolder
+    .setInstance(new ClientServerObserverAdapter() {
+      public void afterPrimaryIdentificationFromBackup(ServerLocation 
primaryEndpoint)
+      {
+        LogService.getLogger().info("After primary is set");
+        afterPrimaryCount++;
+      }
+    });
+  }
+
+  public static void checkResultAndUnsetClientServerObserver()
+  {
+    PoolImpl.AFTER_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG = false;
+    // setPrimary only happened once
+    assertEquals(1, afterPrimaryCount);
+    afterPrimaryCount = 0;
+  }
+
+  public static void setCacheClientProxyTestHook()
+  {
+    CacheClientProxy.testHook = new CacheClientProxy.TestHook() {
+      @Override
+      public void doTestHook(String spot) {
+        if (spot.equals("CLIENT_RECONNECTED")) {
+          afterProxyReinitialized++;
+        }
+      }
+    };
+  }
+
+  public static void checkResultAndUnsetCacheClientProxyTestHook()
+  {
+    // Reinitialize only happened once
+    CacheClientProxy.testHook = null;
+    assertEquals(1, afterProxyReinitialized);
+    afterProxyReinitialized = 0;
+  }
+  
+  private boolean checkProxyIsPrimary(VM vm) {
+    SerializableCallable checkProxyIsPrimary = new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
+        final CacheClientNotifier ccn = CacheClientNotifier.getInstance();
+        Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> { return 
(ccn.getClientProxies().size() == 1); }); 
+        
+        Iterator iter_prox = ccn.getClientProxies().iterator();
+        CacheClientProxy proxy = (CacheClientProxy)iter_prox.next();
+        return proxy.isPrimary();
+      }
+    };
+    return (Boolean)vm.invoke(checkProxyIsPrimary);
+  }
+}

Reply via email to