http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/UpdateVersionDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/UpdateVersionDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/UpdateVersionDUnitTest.java new file mode 100644 index 0000000..a1aec80 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/UpdateVersionDUnitTest.java @@ -0,0 +1,962 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache; + +import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; +import static org.junit.Assert.*; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.StringTokenizer; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.AttributesFactory; +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.CacheException; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.DataPolicy; +import com.gemstone.gemfire.cache.DiskStore; +import com.gemstone.gemfire.cache.DiskStoreFactory; +import com.gemstone.gemfire.cache.EntryNotFoundException; +import com.gemstone.gemfire.cache.Operation; +import com.gemstone.gemfire.cache.PartitionAttributesFactory; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.Region.Entry; +import com.gemstone.gemfire.cache.Scope; +import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallbackAdapter; +import com.gemstone.gemfire.cache.wan.GatewayEventFilter; +import com.gemstone.gemfire.cache.wan.GatewayReceiver; +import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory; +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.cache.wan.GatewaySenderFactory; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.internal.AvailablePortHelper; +import com.gemstone.gemfire.internal.cache.LocalRegion.NonTXEntry; +import com.gemstone.gemfire.internal.cache.partitioned.PRLocallyDestroyedException; +import com.gemstone.gemfire.internal.cache.versions.VersionSource; +import com.gemstone.gemfire.internal.cache.versions.VersionStamp; +import com.gemstone.gemfire.internal.cache.versions.VersionTag; +import com.gemstone.gemfire.internal.cache.wan.InternalGatewaySenderFactory; +import com.gemstone.gemfire.test.dunit.Host; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.Invoke; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.SerializableCallable; +import com.gemstone.gemfire.test.dunit.SerializableRunnable; +import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.dunit.WaitCriterion; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +/** + * @since GemFire 7.0.1 + */ +@Category(DistributedTest.class) +public class UpdateVersionDUnitTest extends JUnit4DistributedTestCase { + + protected static final String regionName = "testRegion"; + protected static Cache cache; + private static Set<IgnoredException>expectedExceptions = new HashSet<IgnoredException>(); + + @Override + public final void preTearDown() throws Exception { + closeCache(); + Invoke.invokeInEveryVM(new SerializableRunnable() { public void run() { + closeCache(); + } }); + } + + @Test + public void testUpdateVersionAfterCreateWithSerialSender() { + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); // server1 site1 + VM vm1 = host.getVM(1); // server2 site1 + + VM vm2 = host.getVM(2); // server1 site2 + VM vm3 = host.getVM(3); // server2 site2 + + final String key = "key-1"; + + // Site 1 + Integer lnPort = (Integer)vm0.invoke(() -> UpdateVersionDUnitTest.createFirstLocatorWithDSId( 1 )); + + vm0.invoke(() -> UpdateVersionDUnitTest.createCache( lnPort)); + vm0.invoke(() -> UpdateVersionDUnitTest.createSender( "ln1", 2, false, 10, 1, false, false, null, true )); + + vm0.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "ln1", 1, 1)); + vm0.invoke(() -> UpdateVersionDUnitTest.startSender( "ln1" )); + vm0.invoke(() -> UpdateVersionDUnitTest.waitForSenderRunningState( "ln1" )); + + //Site 2 + Integer nyPort = (Integer)vm2.invoke(() -> UpdateVersionDUnitTest.createFirstRemoteLocator( 2, lnPort )); + Integer nyRecPort = (Integer) vm2.invoke(() -> UpdateVersionDUnitTest.createReceiver( nyPort )); + + vm2.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1)); + vm3.invoke(() -> UpdateVersionDUnitTest.createCache( nyPort)); + vm3.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1)); + + final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Update a single entry and get its version") { + + @Override + public Object call() throws CacheException { + Cache cache = CacheFactory.getAnyInstance(); + Region region = cache.getRegion(regionName); + assertTrue(region instanceof PartitionedRegion); + + region.put(key, "value-1"); + Entry entry = region.getEntry(key); + assertTrue(entry instanceof EntrySnapshot); + RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry(); + + VersionStamp stamp = regionEntry.getVersionStamp(); + + // Create a duplicate entry version tag from stamp with newer + // time-stamp. + VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember(); + VersionTag tag = VersionTag.create(memberId); + + int entryVersion = stamp.getEntryVersion()-1; + int dsid = stamp.getDistributedSystemId(); + long time = System.currentTimeMillis(); + + tag.setEntryVersion(entryVersion); + tag.setDistributedSystemId(dsid); + tag.setVersionTimeStamp(time); + tag.setIsRemoteForTesting(); + + EntryEventImpl event = createNewEvent((PartitionedRegion) region, tag, + entry.getKey(), "value-2"); + + ((LocalRegion) region).basicUpdate(event, false, true, 0L, false); + + // Verify the new stamp + entry = region.getEntry(key); + assertTrue(entry instanceof EntrySnapshot); + regionEntry = ((EntrySnapshot) entry).getRegionEntry(); + + stamp = regionEntry.getVersionStamp(); + assertEquals( + "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion", + time, stamp.getVersionTimeStamp()); + assertEquals(++entryVersion, stamp.getEntryVersion()); + assertEquals(dsid, stamp.getDistributedSystemId()); + + return stamp.asVersionTag(); + } + }); + + VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") { + + @Override + public Object call() throws Exception { + + Cache cache = CacheFactory.getAnyInstance(); + final PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName); + + // wait for entry to be received + WaitCriterion wc = new WaitCriterion() { + public boolean done() { + Entry<?,?> entry = null; + try { + entry = region.getDataStore().getEntryLocally(0, key, false, false); + } catch (EntryNotFoundException e) { + // expected + } catch (ForceReattemptException e) { + // expected + } catch (PRLocallyDestroyedException e) { + throw new RuntimeException("unexpected exception", e); + } + if (entry != null) { + LogWriterUtils.getLogWriter().info("found entry " + entry); + } + return (entry != null); + } + + public String description() { + return "Expected "+key+" to be received on remote WAN site"; + } + }; + Wait.waitForCriterion(wc, 30000, 500, true); + + wc = new WaitCriterion() { + public boolean done() { + Entry entry = region.getEntry(key); + assertTrue(entry instanceof EntrySnapshot); + RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry(); + return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp(); + } + public String description() { + return "waiting for timestamp to be updated"; + } + }; + Wait.waitForCriterion(wc, 30000, 500, true); + + Entry entry = region.getEntry(key); + assertTrue("entry class is wrong: " + entry, entry instanceof EntrySnapshot); + RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry(); + + VersionStamp stamp = regionEntry.getVersionStamp(); + + return stamp.asVersionTag(); + } + }); + + assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp()); + } + + @Test + public void testUpdateVersionAfterCreateWithSerialSenderOnDR() { + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); // server1 site1 + VM vm1 = host.getVM(1); // server2 site1 + + VM vm2 = host.getVM(2); // server1 site2 + VM vm3 = host.getVM(3); // server2 site2 + + final String key = "key-1"; + + // Site 1 + Integer lnPort = (Integer)vm0.invoke(() -> UpdateVersionDUnitTest.createFirstLocatorWithDSId( 1 )); + + vm0.invoke(() -> UpdateVersionDUnitTest.createCache( lnPort)); + vm0.invoke(() -> UpdateVersionDUnitTest.createSender( "ln1", 2, false, 10, 1, false, false, null, true )); + + vm0.invoke(() -> UpdateVersionDUnitTest.createReplicatedRegion(regionName, "ln1")); + vm0.invoke(() -> UpdateVersionDUnitTest.startSender( "ln1" )); + vm0.invoke(() -> UpdateVersionDUnitTest.waitForSenderRunningState( "ln1" )); + + //Site 2 + Integer nyPort = (Integer)vm2.invoke(() -> UpdateVersionDUnitTest.createFirstRemoteLocator( 2, lnPort )); + Integer nyRecPort = (Integer) vm2.invoke(() -> UpdateVersionDUnitTest.createReceiver( nyPort )); + + vm2.invoke(() -> UpdateVersionDUnitTest.createReplicatedRegion(regionName, "")); + vm3.invoke(() -> UpdateVersionDUnitTest.createCache( nyPort )); + vm3.invoke(() -> UpdateVersionDUnitTest.createReplicatedRegion(regionName, "")); + + final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Update a single entry and get its version") { + + @Override + public Object call() throws CacheException { + Cache cache = CacheFactory.getAnyInstance(); + Region region = cache.getRegion(regionName); + assertTrue(region instanceof DistributedRegion); + + region.put(key, "value-1"); + Entry entry = region.getEntry(key); + assertTrue(entry instanceof NonTXEntry); + RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry(); + + VersionStamp stamp = regionEntry.getVersionStamp(); + + // Create a duplicate entry version tag from stamp with newer + // time-stamp. + VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember(); + VersionTag tag = VersionTag.create(memberId); + + int entryVersion = stamp.getEntryVersion()-1; + int dsid = stamp.getDistributedSystemId(); + long time = System.currentTimeMillis(); + + tag.setEntryVersion(entryVersion); + tag.setDistributedSystemId(dsid); + tag.setVersionTimeStamp(time); + tag.setIsRemoteForTesting(); + + EntryEventImpl event = createNewEvent((DistributedRegion) region, tag, + entry.getKey(), "value-2"); + + ((LocalRegion) region).basicUpdate(event, false, true, 0L, false); + + // Verify the new stamp + entry = region.getEntry(key); + assertTrue(entry instanceof NonTXEntry); + regionEntry = ((NonTXEntry) entry).getRegionEntry(); + + stamp = regionEntry.getVersionStamp(); + assertEquals( + "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion", + time, stamp.getVersionTimeStamp()); + assertEquals(entryVersion+1, stamp.getEntryVersion()); + assertEquals(dsid, stamp.getDistributedSystemId()); + + return stamp.asVersionTag(); + } + }); + + VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") { + + @Override + public Object call() throws Exception { + + Cache cache = CacheFactory.getAnyInstance(); + final Region region = cache.getRegion(regionName); + + // wait for entry to be received + WaitCriterion wc = new WaitCriterion() { + public boolean done() { + return (region.getEntry(key) != null); + } + + public String description() { + return "Expected key-1 to be received on remote WAN site"; + } + }; + Wait.waitForCriterion(wc, 30000, 500, true); + + wc = new WaitCriterion() { + public boolean done() { + Entry entry = region.getEntry(key); + assertTrue(entry instanceof NonTXEntry); + RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry(); + return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp(); + } + public String description() { + return "waiting for timestamp to be updated"; + } + }; + Wait.waitForCriterion(wc, 30000, 500, true); + + Entry entry = region.getEntry(key); + assertTrue(entry instanceof NonTXEntry); + RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry(); + + VersionStamp stamp = regionEntry.getVersionStamp(); + + return stamp.asVersionTag(); + } + }); + + assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp()); + } + + @Test + public void testUpdateVersionAfterCreateWithParallelSender() { + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); // server1 site1 + VM vm1 = host.getVM(1); // server2 site1 + + VM vm2 = host.getVM(2); // server1 site2 + VM vm3 = host.getVM(3); // server2 site2 + + // Site 1 + Integer lnPort = (Integer)vm0.invoke(() -> UpdateVersionDUnitTest.createFirstLocatorWithDSId( 1 )); + + final String key = "key-1"; + + vm0.invoke(() -> UpdateVersionDUnitTest.createCache( lnPort)); + vm0.invoke(() -> UpdateVersionDUnitTest.createSender( "ln1", 2, true, 10, 1, false, false, null, true )); + + vm0.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "ln1", 1, 1)); + vm0.invoke(() -> UpdateVersionDUnitTest.startSender( "ln1" )); + vm0.invoke(() -> UpdateVersionDUnitTest.waitForSenderRunningState( "ln1" )); + + //Site 2 + Integer nyPort = (Integer)vm2.invoke(() -> UpdateVersionDUnitTest.createFirstRemoteLocator( 2, lnPort )); + Integer nyRecPort = (Integer) vm2.invoke(() -> UpdateVersionDUnitTest.createReceiver( nyPort )); + + vm2.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1)); + + vm3.invoke(() -> UpdateVersionDUnitTest.createCache( nyPort)); + vm3.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1)); + + final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Put a single entry and get its version") { + + @Override + public Object call() throws CacheException { + Cache cache = CacheFactory.getAnyInstance(); + Region region = cache.getRegion(regionName); + assertTrue(region instanceof PartitionedRegion); + + region.put(key, "value-1"); + Entry entry = region.getEntry(key); + assertTrue(entry instanceof EntrySnapshot); + RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry(); + + VersionStamp stamp = regionEntry.getVersionStamp(); + + // Create a duplicate entry version tag from stamp with newer + // time-stamp. + VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember(); + VersionTag tag = VersionTag.create(memberId); + + int entryVersion = stamp.getEntryVersion()-1; + int dsid = stamp.getDistributedSystemId(); + long time = System.currentTimeMillis(); + + tag.setEntryVersion(entryVersion); + tag.setDistributedSystemId(dsid); + tag.setVersionTimeStamp(time); + tag.setIsRemoteForTesting(); + + EntryEventImpl event = createNewEvent((PartitionedRegion) region, tag, + entry.getKey(), "value-2"); + + ((LocalRegion) region).basicUpdate(event, false, true, 0L, false); + + // Verify the new stamp + entry = region.getEntry(key); + assertTrue(entry instanceof EntrySnapshot); + regionEntry = ((EntrySnapshot) entry).getRegionEntry(); + + stamp = regionEntry.getVersionStamp(); + assertEquals( + "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion", + time, stamp.getVersionTimeStamp()); + assertEquals(++entryVersion, stamp.getEntryVersion()); + assertEquals(dsid, stamp.getDistributedSystemId()); + + return stamp.asVersionTag(); + } + }); + + VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") { + + @Override + public Object call() throws Exception { + + Cache cache = CacheFactory.getAnyInstance(); + final PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName); + + // wait for entry to be received + WaitCriterion wc = new WaitCriterion() { + public boolean done() { + Entry<?,?> entry = null; + try { + entry = region.getDataStore().getEntryLocally(0, key, false, false); + } catch (EntryNotFoundException e) { + // expected + } catch (ForceReattemptException e) { + // expected + } catch (PRLocallyDestroyedException e) { + throw new RuntimeException("unexpected exception", e); + } + if (entry != null) { + LogWriterUtils.getLogWriter().info("found entry " + entry); + } + return (entry != null); + } + + public String description() { + return "Expected key-1 to be received on remote WAN site"; + } + }; + Wait.waitForCriterion(wc, 30000, 500, true); + + wc = new WaitCriterion() { + public boolean done() { + Entry entry = region.getEntry(key); + assertTrue(entry instanceof EntrySnapshot); + RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry(); + return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp(); + } + public String description() { + return "waiting for timestamp to be updated"; + } + }; + Wait.waitForCriterion(wc, 30000, 500, true); + + Entry entry = region.getEntry(key); + assertTrue(entry instanceof EntrySnapshot); + RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry(); + + VersionStamp stamp = regionEntry.getVersionStamp(); + + return stamp.asVersionTag(); + } + }); + + assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp()); + } + + @Test + public void testUpdateVersionAfterCreateWithConcurrentSerialSender() { + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); // server1 site1 + VM vm1 = host.getVM(1); // server2 site1 + + VM vm2 = host.getVM(2); // server1 site2 + VM vm3 = host.getVM(3); // server2 site2 + + // Site 1 + Integer lnPort = (Integer)vm0.invoke(() -> UpdateVersionDUnitTest.createFirstLocatorWithDSId( 1 )); + + final String key = "key-1"; + + vm0.invoke(() -> UpdateVersionDUnitTest.createCache( lnPort )); + vm0.invoke(() -> UpdateVersionDUnitTest.createConcurrentSender( "ln1", 2, false, 10, 2, false, false, null, true, 2 )); + + vm0.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "ln1", 1, 1)); + vm0.invoke(() -> UpdateVersionDUnitTest.startSender( "ln1" )); + vm0.invoke(() -> UpdateVersionDUnitTest.waitForSenderRunningState( "ln1" )); + + //Site 2 + Integer nyPort = (Integer)vm2.invoke(() -> UpdateVersionDUnitTest.createFirstRemoteLocator( 2, lnPort )); + Integer nyRecPort = (Integer) vm2.invoke(() -> UpdateVersionDUnitTest.createReceiver( nyPort )); + + vm2.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1)); + + vm3.invoke(() -> UpdateVersionDUnitTest.createCache( nyPort )); + vm3.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1)); + + final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Put a single entry and get its version") { + + @Override + public Object call() throws CacheException { + Cache cache = CacheFactory.getAnyInstance(); + Region region = cache.getRegion(regionName); + assertTrue(region instanceof PartitionedRegion); + + region.put(key, "value-1"); + Entry entry = region.getEntry(key); + assertTrue(entry instanceof EntrySnapshot); + RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry(); + + VersionStamp stamp = regionEntry.getVersionStamp(); + + // Create a duplicate entry version tag from stamp with newer + // time-stamp. + VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember(); + VersionTag tag = VersionTag.create(memberId); + + int entryVersion = stamp.getEntryVersion()-1; + int dsid = stamp.getDistributedSystemId(); + long time = System.currentTimeMillis(); + + tag.setEntryVersion(entryVersion); + tag.setDistributedSystemId(dsid); + tag.setVersionTimeStamp(time); + tag.setIsRemoteForTesting(); + + EntryEventImpl event = createNewEvent((PartitionedRegion) region, tag, + entry.getKey(), "value-2"); + + ((LocalRegion) region).basicUpdate(event, false, true, 0L, false); + + // Verify the new stamp + entry = region.getEntry(key); + assertTrue(entry instanceof EntrySnapshot); + regionEntry = ((EntrySnapshot) entry).getRegionEntry(); + + stamp = regionEntry.getVersionStamp(); + assertEquals( + "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion", + time, stamp.getVersionTimeStamp()); + assertEquals(++entryVersion, stamp.getEntryVersion()); + assertEquals(dsid, stamp.getDistributedSystemId()); + + return stamp.asVersionTag(); + } + }); + + VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") { + + @Override + public Object call() throws Exception { + + Cache cache = CacheFactory.getAnyInstance(); + final PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName); + + // wait for entry to be received + WaitCriterion wc = new WaitCriterion() { + public boolean done() { + Entry<?,?> entry = null; + try { + entry = region.getDataStore().getEntryLocally(0, key, false, false); + } catch (EntryNotFoundException e) { + // expected + } catch (ForceReattemptException e) { + // expected + } catch (PRLocallyDestroyedException e) { + throw new RuntimeException("unexpected exception", e); + } + if (entry != null) { + LogWriterUtils.getLogWriter().info("found entry " + entry); + } + return (entry != null); + } + + public String description() { + return "Expected key-1 to be received on remote WAN site"; + } + }; + Wait.waitForCriterion(wc, 30000, 500, true); + + wc = new WaitCriterion() { + public boolean done() { + Entry entry = region.getEntry(key); + assertTrue(entry instanceof EntrySnapshot); + RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry(); + return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp(); + } + public String description() { + return "waiting for timestamp to be updated"; + } + }; + Wait.waitForCriterion(wc, 30000, 500, true); + + Entry entry = region.getEntry(key); + assertTrue(entry instanceof EntrySnapshot); + RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry(); + + VersionStamp stamp = regionEntry.getVersionStamp(); + + return stamp.asVersionTag(); + } + }); + + assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp()); + } + + private VersionTagHolder createNewEvent(LocalRegion region, VersionTag tag, Object key, Object value) { + VersionTagHolder updateEvent = new VersionTagHolder(tag); + updateEvent.setOperation(Operation.UPDATE); + updateEvent.setRegion(region); + if (region instanceof PartitionedRegion) { + updateEvent.setKeyInfo(((PartitionedRegion)region).getKeyInfo(key)); + } else { + updateEvent.setKeyInfo(new KeyInfo(key, value, null)); + } + updateEvent.setNewValue(value); + updateEvent.setGenerateCallbacks(true); + updateEvent.distributedMember = region.getSystem().getDistributedMember(); + updateEvent.setNewEventId(region.getSystem()); + return updateEvent; + } + + /* + * Helper Methods + */ + + private static void createCache(Integer locPort) { + UpdateVersionDUnitTest test = new UpdateVersionDUnitTest(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + locPort + "]"); + props.setProperty(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel()); + props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false"); + props.setProperty(USE_CLUSTER_CONFIGURATION, "false"); + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + IgnoredException ex = new IgnoredException("could not get remote locator information for remote site"); + cache.getLogger().info(ex.getAddMessage()); + expectedExceptions.add(ex); + ex = new IgnoredException("Pool ln1 is not available"); + cache.getLogger().info(ex.getAddMessage()); + expectedExceptions.add(ex); + } + + private static void closeCache() { + if (cache != null && !cache.isClosed()) { + for (IgnoredException expectedException: expectedExceptions) { + cache.getLogger().info(expectedException.getRemoveMessage()); + } + expectedExceptions.clear(); + cache.getDistributedSystem().disconnect(); + cache.close(); + } + cache = null; + } + + public static void createSender(String dsName, int remoteDsId, + boolean isParallel, Integer maxMemory, Integer batchSize, + boolean isConflation, boolean isPersistent, GatewayEventFilter filter, + boolean isManualStart) { + File persistentDirectory = new File(dsName + "_disk_" + + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + persistentDirectory.mkdir(); + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + File[] dirs1 = new File[] { persistentDirectory }; + if (isParallel) { + GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); + gateway.setParallel(true); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + gateway.setManualStart(isManualStart); + ((InternalGatewaySenderFactory) gateway) + .setLocatorDiscoveryCallback(new MyLocatorCallback()); + if (filter != null) { + gateway.addGatewayEventFilter(filter); + } + if (isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) + .getName()); + } else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); + gateway.setDiskStoreName(store.getName()); + } + gateway.setBatchConflationEnabled(isConflation); + gateway.create(dsName, remoteDsId); + + } else { + GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + gateway.setManualStart(isManualStart); + ((InternalGatewaySenderFactory) gateway) + .setLocatorDiscoveryCallback(new MyLocatorCallback()); + if (filter != null) { + gateway.addGatewayEventFilter(filter); + } + gateway.setBatchConflationEnabled(isConflation); + if (isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) + .getName()); + } else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); + gateway.setDiskStoreName(store.getName()); + } + gateway.create(dsName, remoteDsId); + } + } + + public static void createPartitionedRegion(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets){ + AttributesFactory fact = new AttributesFactory(); + if(senderIds!= null){ + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()){ + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + PartitionAttributesFactory pFact = new PartitionAttributesFactory(); + pFact.setTotalNumBuckets(totalNumBuckets); + pFact.setRedundantCopies(redundantCopies); + pFact.setRecoveryDelay(0); + fact.setPartitionAttributes(pFact.create()); + Region r = cache.createRegionFactory(fact.create()).create(regionName); + assertNotNull(r); + } + + public static void createReplicatedRegion(String regionName, String senderIds){ + AttributesFactory fact = new AttributesFactory(); + if(senderIds!= null){ + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()){ + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + fact.setDataPolicy(DataPolicy.REPLICATE); + fact.setScope(Scope.DISTRIBUTED_ACK); + Region r = cache.createRegionFactory(fact.create()).create(regionName); + assertNotNull(r); + } + + public static void waitForSenderRunningState(String senderId){ + Set<GatewaySender> senders = cache.getGatewaySenders(); + final GatewaySender sender = getGatewaySenderById(senders, senderId); + + WaitCriterion wc = new WaitCriterion() { + public boolean done() { + if (sender != null && sender.isRunning()) { + return true; + } + return false; + } + + public String description() { + return "Expected sender isRunning state to be true but is false"; + } + }; + Wait.waitForCriterion(wc, 300000, 500, true); + } + + public static Integer createFirstRemoteLocator(int dsId, int remoteLocPort) { + UpdateVersionDUnitTest test = new UpdateVersionDUnitTest(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); + props.setProperty(LOCATORS, "localhost[" + port + "]"); + props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost"); + props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]"); + props.setProperty(USE_CLUSTER_CONFIGURATION, "false"); + props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false"); + test.getSystem(props); + return port; + } + + public static void createConcurrentSender(String dsName, int remoteDsId, + boolean isParallel, Integer maxMemory, + Integer batchSize, boolean isConflation, boolean isPersistent, + GatewayEventFilter filter, boolean isManualStart, int concurrencyLevel) { + File persistentDirectory = new File(dsName +"_disk_"+System.currentTimeMillis()+"_" + VM.getCurrentVMNum()); + persistentDirectory.mkdir(); + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + File [] dirs1 = new File[] {persistentDirectory}; + + if(isParallel) { + GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); + gateway.setParallel(true); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + gateway.setManualStart(isManualStart); + ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback()); + if (filter != null) { + gateway.addGatewayEventFilter(filter); + } + if(isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName()); + } + else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); + gateway.setDiskStoreName(store.getName()); + } + gateway.setBatchConflationEnabled(isConflation); + gateway.create(dsName, remoteDsId); + + }else { + GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + gateway.setManualStart(isManualStart); + ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback()); + if (filter != null) { + gateway.addGatewayEventFilter(filter); + } + gateway.setBatchConflationEnabled(isConflation); + if(isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName()); + } + else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); + gateway.setDiskStoreName(store.getName()); + } + gateway.setDispatcherThreads(concurrencyLevel); + gateway.create(dsName, remoteDsId); + } + } + + public static int createReceiver(int locPort) { + UpdateVersionDUnitTest test = new UpdateVersionDUnitTest(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + locPort + + "]"); + + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + fact.setStartPort(port); + fact.setEndPort(port); + GatewayReceiver receiver = fact.create(); + try { + receiver.start(); + } catch (IOException e) { + e.printStackTrace(); + fail("Test " + test.getName() + " failed to start GatewayReceiver on port " + port); + } + return port; + } + + public static void startSender(String senderId){ + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for(GatewaySender s : senders){ + if(s.getId().equals(senderId)){ + sender = s; + break; + } + } + sender.start(); + } + + protected static class MyLocatorCallback extends LocatorDiscoveryCallbackAdapter { + + private final Set discoveredLocators = new HashSet(); + + private final Set removedLocators = new HashSet(); + + @Override + public synchronized void locatorsDiscovered(List locators) { + discoveredLocators.addAll(locators); + notifyAll(); + } + + @Override + public synchronized void locatorsRemoved(List locators) { + removedLocators.addAll(locators); + notifyAll(); + } + + public boolean waitForDiscovery(InetSocketAddress locator, long time) + throws InterruptedException { + return waitFor(discoveredLocators, locator, time); + } + + public boolean waitForRemove(InetSocketAddress locator, long time) + throws InterruptedException { + return waitFor(removedLocators, locator, time); + } + + private synchronized boolean waitFor(Set set, InetSocketAddress locator, + long time) throws InterruptedException { + long remaining = time; + long endTime = System.currentTimeMillis() + time; + while (!set.contains(locator) && remaining >= 0) { + wait(remaining); + remaining = endTime - System.currentTimeMillis(); + } + return set.contains(locator); + } + + public synchronized Set getDiscovered() { + return new HashSet(discoveredLocators); + } + + public synchronized Set getRemoved() { + return new HashSet(removedLocators); + } + } + + private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders, String senderId) { + for(GatewaySender s : senders){ + if(s.getId().equals(senderId)){ + return s; + } + } + //if none of the senders matches with the supplied senderId, return null + return null; + } + + public static Integer createFirstLocatorWithDSId(int dsId) { + UpdateVersionDUnitTest test = new UpdateVersionDUnitTest(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); + props.setProperty(LOCATORS, "localhost[" + port + "]"); + props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false"); + props.setProperty(USE_CLUSTER_CONFIGURATION, "false"); + props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost"); + test.getSystem(props); + return port; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java new file mode 100755 index 0000000..96d441c --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan; + +import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.AttributesFactory; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.DataPolicy; +import com.gemstone.gemfire.cache.DiskStore; +import com.gemstone.gemfire.cache.EvictionAction; +import com.gemstone.gemfire.cache.EvictionAttributes; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.RegionAttributes; +import com.gemstone.gemfire.cache.client.Pool; +import com.gemstone.gemfire.cache.client.PoolManager; +import com.gemstone.gemfire.cache.server.CacheServer; +import com.gemstone.gemfire.cache.server.ClientSubscriptionConfig; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.internal.AvailablePort; +import com.gemstone.gemfire.internal.cache.CacheServerImpl; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.ha.HAContainerRegion; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.SerializableRunnable; +import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.dunit.WaitCriterion; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; +import com.gemstone.gemfire.test.junit.categories.FlakyTest; + +@Category(DistributedTest.class) +public class CacheClientNotifierDUnitTest extends WANTestBase { + + private static final int NUM_KEYS = 10; + + private int createCacheServerWithCSC(VM vm, final boolean withCSC, final int capacity, + final String policy, final String diskStoreName) { + final int serverPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + + SerializableRunnable createCacheServer = new SerializableRunnable() { + @Override + public void run() throws Exception { + CacheServerImpl server = (CacheServerImpl)cache.addCacheServer(); + server.setPort(serverPort); + if (withCSC) { + if (diskStoreName != null) { + DiskStore ds = cache.findDiskStore(diskStoreName); + if(ds == null) { + ds = cache.createDiskStoreFactory().create(diskStoreName); + } + } + ClientSubscriptionConfig csc = server.getClientSubscriptionConfig(); + csc.setCapacity(capacity); + csc.setEvictionPolicy(policy); + csc.setDiskStoreName(diskStoreName); + server.setHostnameForClients("localhost"); + //server.setGroups(new String[]{"serv"}); + } + try { + server.start(); + } catch (IOException e) { + com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start server ", e); + } + } + }; + vm.invoke(createCacheServer); + return serverPort; + } + + private void checkCacheServer(VM vm, final int serverPort, final boolean withCSC, final int capacity) { + SerializableRunnable checkCacheServer = new SerializableRunnable() { + + @Override + public void run() throws Exception { + List<CacheServer> cacheServers = ((GemFireCacheImpl)cache).getCacheServersAndGatewayReceiver(); + CacheServerImpl server = null; + for (CacheServer cs:cacheServers) { + if (cs.getPort() == serverPort) { + server = (CacheServerImpl)cs; + break; + } + } + assertNotNull(server); + CacheClientNotifier ccn = server.getAcceptor().getCacheClientNotifier(); + HAContainerRegion haContainer = (HAContainerRegion)ccn.getHaContainer(); + if (server.getAcceptor().isGatewayReceiver()) { + assertNull(haContainer); + return; + } + Region internalRegion = haContainer.getMapForTest(); + RegionAttributes ra = internalRegion.getAttributes(); + EvictionAttributes ea = ra.getEvictionAttributes(); + if (withCSC) { + assertNotNull(ea); + assertEquals(capacity, ea.getMaximum()); + assertEquals(EvictionAction.OVERFLOW_TO_DISK, ea.getAction()); + } else { + assertNull(ea); + } + } + }; + vm.invoke(checkCacheServer); + } + + public static void closeACacheServer(final int serverPort) { + List<CacheServer> cacheServers = cache.getCacheServers(); + CacheServerImpl server = null; + for (CacheServer cs:cacheServers) { + if (cs.getPort() == serverPort) { + server = (CacheServerImpl)cs; + break; + } + } + assertNotNull(server); + server.stop(); + } + + private void verifyRegionSize(VM vm, final int expect) { + SerializableRunnable verifyRegionSize = new SerializableRunnable() { + @Override + public void run() throws Exception { + final Region region = cache.getRegion(getTestMethodName() + "_PR"); + + Wait.waitForCriterion(new WaitCriterion() { + public boolean done() { + return region.size() == expect; + } + public String description() { + return null; + } + }, 60000, 100, false); + assertEquals(expect, region.size()); + } + }; + vm.invoke(verifyRegionSize); + } + + /** + * The test will start several cache servers, including gateway receivers. + * Shutdown them and verify the CacheClientNotifier for each server is correct + */ + @Test + public void testNormalClient2MultipleCacheServer() throws Exception { + doMultipleCacheServer(false); + } + + public void doMultipleCacheServer(boolean durable) throws Exception { + /* test scenario: */ + /* create 1 GatewaySender on vm0 */ + /* create 1 GatewayReceiver on vm1 */ + /* create 2 cache servers on vm1, one with overflow. */ + /* verify if the cache server2 still has the overflow attributes */ + /* create 1 cache client1 on vm2 to register interest on cache server1 */ + /* create 1 cache client2 on vm3 to register interest on cache server1 */ + /* do some puts to GatewaySender on vm0 */ + + // create sender at ln + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + + // create receiver and cache servers will be at ny + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + vm1.invoke(() -> WANTestBase.createCache( nyPort )); + int receiverPort = vm1.invoke(() -> WANTestBase.createReceiver()); + checkCacheServer(vm1, receiverPort, false, 0); + + // create PR for receiver + vm1.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + // create cache server1 with overflow + int serverPort = createCacheServerWithCSC(vm1, true, 3, "entry", "DEFAULT"); + checkCacheServer(vm1, serverPort, true, 3); + + // create cache server 2 + final int serverPort2 = createCacheServerWithCSC(vm1, false, 0, null, null); + // Currently, only the first cache server's overflow attributes will take effect + // It will be enhanced in GEODE-1102 + checkCacheServer(vm1, serverPort2, true, 3); + LogService.getLogger().info("receiverPort="+receiverPort+",serverPort="+serverPort+",serverPort2="+serverPort2); + + vm2.invoke(() -> createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR", "123", durable)); + vm3.invoke(() -> createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR", "124", durable)); + + vm0.invoke(() -> WANTestBase.createCache( lnPort )); + vm0.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 400, false, false, null, true )); + vm0.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm0.invoke(() -> WANTestBase.startSender( "ln" )); + vm0.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", NUM_KEYS )); + + /* verify */ + verifyRegionSize(vm0, NUM_KEYS); + verifyRegionSize(vm1, NUM_KEYS); + verifyRegionSize(vm3, NUM_KEYS); + verifyRegionSize(vm2, NUM_KEYS); + + // close a cache server, then re-test + vm1.invoke(() -> closeACacheServer(serverPort2)); + + vm0.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", NUM_KEYS*2 )); + + /* verify */ + verifyRegionSize(vm0, NUM_KEYS*2); + verifyRegionSize(vm1, NUM_KEYS*2); + verifyRegionSize(vm3, NUM_KEYS*2); + verifyRegionSize(vm2, NUM_KEYS*2); + + disconnectAllFromDS(); + } + + public static void createClientWithLocator(int port0,String host, + String regionName, String clientId, boolean isDurable) { + WANTestBase test = new WANTestBase(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, ""); + if (isDurable) { + props.setProperty(DURABLE_CLIENT_ID, clientId); + props.setProperty(DURABLE_CLIENT_TIMEOUT, "" + 200); + } + + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + + assertNotNull(cache); + CacheServerTestUtil.disableShufflingOfEndpoints(); + Pool p; + try { + p = PoolManager.createFactory().addLocator(host, port0) + .setPingInterval(250).setSubscriptionEnabled(true) + .setSubscriptionRedundancy(-1).setReadTimeout(2000) + .setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10) + .setRetryAttempts(3).create(regionName); + } finally { + CacheServerTestUtil.enableShufflingOfEndpoints(); + } + + AttributesFactory factory = new AttributesFactory(); + factory.setPoolName(p.getName()); + factory.setDataPolicy(DataPolicy.NORMAL); + RegionAttributes attrs = factory.create(); + region = cache.createRegion(regionName, attrs); + region.registerInterest("ALL_KEYS"); + assertNotNull(region); + if (isDurable) { + cache.readyForEvents(); + } + LogWriterUtils.getLogWriter().info( + "Distributed Region " + regionName + " created Successfully :" + + region.toString() + " in a "+(isDurable?"durable":"")+" client"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java new file mode 100755 index 0000000..1d4e947 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/Simple2CacheServerDUnitTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.EvictionAction; +import com.gemstone.gemfire.cache.EvictionAttributes; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.RegionAttributes; +import com.gemstone.gemfire.cache.client.internal.PoolImpl; +import com.gemstone.gemfire.cache.server.CacheServer; +import com.gemstone.gemfire.distributed.internal.ServerLocation; +import com.gemstone.gemfire.internal.cache.CacheServerImpl; +import com.gemstone.gemfire.internal.cache.ClientServerObserverAdapter; +import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.test.dunit.SerializableCallable; +import com.gemstone.gemfire.test.dunit.SerializableRunnable; +import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.dunit.WaitCriterion; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.ha.HAContainerRegion; +import com.gemstone.gemfire.test.junit.categories.FlakyTest; +import com.jayway.awaitility.Awaitility; + +@Category(DistributedTest.class) +public class Simple2CacheServerDUnitTest extends WANTestBase { + private static final int NUM_KEYS = 10; + static int afterPrimaryCount = 0; + static int afterProxyReinitialized = 0; + + public Simple2CacheServerDUnitTest() { + super(); + } + + @Test + public void testNormalClient2MultipleCacheServer() throws Exception { + doMultipleCacheServer(false); + } + + public void doMultipleCacheServer(boolean durable) throws Exception { + Integer lnPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + vm1.invoke(() -> WANTestBase.createCache( lnPort )); + vm1.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + int serverPort = vm1.invoke(() -> WANTestBase.createCacheServer()); + int serverPort2 = vm1.invoke(() -> WANTestBase.createCacheServer()); + + if (durable) { + vm1.invoke(() -> setCacheClientProxyTestHook()); + } else { + vm2.invoke(() -> setClientServerObserver()); + } + vm2.invoke(() -> CacheClientNotifierDUnitTest.createClientWithLocator(lnPort, "localhost", getTestMethodName() + "_PR" , "123", durable)); + + vm0.invoke(() -> WANTestBase.createCache( lnPort )); + vm0.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + int serverPort3 = vm0.invoke(() -> WANTestBase.createCacheServer()); + + if (durable) { + vm1.invoke(() -> checkResultAndUnsetCacheClientProxyTestHook()); + } else { + vm2.invoke(() -> checkResultAndUnsetClientServerObserver()); + } + Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> { return checkProxyIsPrimary(vm0) || checkProxyIsPrimary(vm1); }); + + // close the current primary cache server, then re-test + int serverPortAtVM1 = vm1.invoke(()-> findCacheServerForPrimaryProxy()); + if (serverPortAtVM1 != 0) { + vm1.invoke(()-> CacheClientNotifierDUnitTest.closeACacheServer(serverPortAtVM1)); + LogService.getLogger().info("Closed cache server on vm1:"+serverPortAtVM1); + Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> { return checkProxyIsPrimary(vm0) || checkProxyIsPrimary(vm1); }); + } else { + vm0.invoke(()-> CacheClientNotifierDUnitTest.closeACacheServer(serverPort3)); + LogService.getLogger().info("Closed cache server on vm0:"+serverPort3); + assertTrue(checkProxyIsPrimary(vm1)); + } + disconnectAllFromDS(); + } + + private static int findCacheServerForPrimaryProxy() { + List<CacheServer> cacheServers = ((GemFireCacheImpl)cache).getCacheServers(); + CacheServerImpl server = null; + for (CacheServer cs:cacheServers) { + server = (CacheServerImpl)cs; + long acceptorId = server.getAcceptor().getAcceptorId(); + for (CacheClientProxy proxy:CacheClientNotifier.getInstance().getClientProxies()) { + if (proxy.isPrimary() == false) { + continue; + } + if (proxy.getAcceptorId() == acceptorId) { + LogService.getLogger().info("Found cache server "+server+" for the primary proxy "+proxy); + return server.getPort(); + } + } + } + return 0; + } + + public static void setClientServerObserver() + { + PoolImpl.AFTER_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG = true; + ClientServerObserverHolder + .setInstance(new ClientServerObserverAdapter() { + public void afterPrimaryIdentificationFromBackup(ServerLocation primaryEndpoint) + { + LogService.getLogger().info("After primary is set"); + afterPrimaryCount++; + } + }); + } + + public static void checkResultAndUnsetClientServerObserver() + { + PoolImpl.AFTER_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG = false; + // setPrimary only happened once + assertEquals(1, afterPrimaryCount); + afterPrimaryCount = 0; + } + + public static void setCacheClientProxyTestHook() + { + CacheClientProxy.testHook = new CacheClientProxy.TestHook() { + @Override + public void doTestHook(String spot) { + if (spot.equals("CLIENT_RECONNECTED")) { + afterProxyReinitialized++; + } + } + }; + } + + public static void checkResultAndUnsetCacheClientProxyTestHook() + { + // Reinitialize only happened once + CacheClientProxy.testHook = null; + assertEquals(1, afterProxyReinitialized); + afterProxyReinitialized = 0; + } + + private boolean checkProxyIsPrimary(VM vm) { + SerializableCallable checkProxyIsPrimary = new SerializableCallable() { + @Override + public Object call() throws Exception { + final CacheClientNotifier ccn = CacheClientNotifier.getInstance(); + Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> { return (ccn.getClientProxies().size() == 1); }); + + Iterator iter_prox = ccn.getClientProxies().iterator(); + CacheClientProxy proxy = (CacheClientProxy)iter_prox.next(); + return proxy.isPrimary(); + } + }; + return (Boolean)vm.invoke(checkProxyIsPrimary); + } +}
