This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-6908 in repository https://gitbox.apache.org/repos/asf/geode.git
commit c57d15803dd9c9146af92a027724eaf777aa22d4 Author: zhouxh <[email protected]> AuthorDate: Fri Jun 28 00:06:11 2019 -0700 GEODE-6908: retried REMOVE should not create new version tag. added retry dunit tests for all the c/s operations. --- .../pdx/ClientsWithVersioningRetryDUnitTest.java | 285 +++++++++++++++++---- .../apache/geode/internal/cache/LocalRegion.java | 9 +- 2 files changed, 238 insertions(+), 56 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/pdx/ClientsWithVersioningRetryDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/pdx/ClientsWithVersioningRetryDUnitTest.java index e992d21..5beab36 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/pdx/ClientsWithVersioningRetryDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/pdx/ClientsWithVersioningRetryDUnitTest.java @@ -49,6 +49,7 @@ import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.DistributionMessage; import org.apache.geode.distributed.internal.DistributionMessageObserver; import org.apache.geode.internal.AvailablePortHelper; +import org.apache.geode.internal.cache.DistributedCacheOperation; import org.apache.geode.internal.cache.DistributedPutAllOperation; import org.apache.geode.internal.cache.DistributedRegion; import org.apache.geode.internal.cache.EntryEventImpl; @@ -117,7 +118,6 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase { final VM vm0 = host.getVM(0); final VM vm1 = host.getVM(1); - createServerRegion(vm0, RegionShortcut.REPLICATE); createServerRegion(vm1, RegionShortcut.REPLICATE); @@ -160,7 +160,8 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase { event.setContext(new ClientProxyMembershipID(memberID)); boolean recovered = ((BaseCommand) Put70.getCommand()).recoverVersionTagForRetriedOperation(event); - assertTrue("Expected to recover the version for this event ID", recovered); + assertTrue("Expected to recover the version for this event ID", + recovered); assertEquals("Expected the region version to be 123", 123, event.getVersionTag().getRegionVersion()); } finally { @@ -173,10 +174,12 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase { vm1.invoke(new SerializableRunnable("recover posdup event tag in vm1 event tracker from vm0") { @Override public void run() { - DistributedRegion dr = (DistributedRegion) getCache().getRegion("region"); + DistributedRegion dr = (DistributedRegion) getCache() + .getRegion("region"); EventID eventID = new EventID(new byte[0], 1, 0); - EntryEventImpl event = EntryEventImpl.create(dr, Operation.CREATE, "TestObject", - "TestValue", null, false, memberID, true, eventID); + EntryEventImpl event = EntryEventImpl + .create(dr, Operation.CREATE, "TestObject", + "TestValue", null, false, memberID, true, eventID); event.setPossibleDuplicate(true); try { dr.hasSeenEvent(event); @@ -189,6 +192,162 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase { }); } + enum OpType { + CREATE, + PUT, + DESTROY, + INVALIDATE, + PUT_IF_ABSENT, + REPLACE, + REPLACE_WITH_OLDVALUE, + REMOVE + }; + + @Test + public void testRetriedCreate() { + doRetriedTest(OpType.CREATE); + } + + @Test + public void testRetriedPut() { + doRetriedTest(OpType.PUT); + } + + @Test + public void testRetriedDestroy() { + doRetriedTest(OpType.DESTROY); + } + + @Test + public void testRetriedInvalidate() { + doRetriedTest(OpType.INVALIDATE); + } + + @Test + public void testRetriedPutIfAbsent() { + doRetriedTest(OpType.PUT_IF_ABSENT); + } + + @Test + public void testRetriedReplace() { + doRetriedTest(OpType.REPLACE); + } + + @Test + public void testRetriedReplaceWithOldValue() { + doRetriedTest(OpType.REPLACE_WITH_OLDVALUE); + } + + @Test + public void testRetriedRemove() { + doRetriedTest(OpType.REMOVE); + } + + private void doRetriedTest(final OpType opType) { + Host host = Host.getHost(0); + final VM vm0 = host.getVM(0); + final VM vm1 = host.getVM(1); + final VM vm3 = host.getVM(3); + + int port0 = createServerRegion(vm0, RegionShortcut.REPLICATE); + int port1 = createServerRegion(vm1, RegionShortcut.REPLICATE); + createClientRegion(vm3, port0, port1); + + vm0.invoke(new SerializableRunnable() { + + @Override + public void run() { + Region region = getCache().getRegion("region"); + if (opType != OpType.CREATE && opType != OpType.PUT_IF_ABSENT) { + region.put(0, "value"); + } + + // Add a listener to close vm0 when we send a distributed operation + // this will cause a retry after we have applied the original put all to + // the cache, causing a retry + DistributionMessageObserver + .setInstance(new DistributionMessageObserver() { + + @Override + public void beforeSendMessage(ClusterDistributionManager dm, + DistributionMessage message) { + if (message instanceof DistributedCacheOperation.CacheOperationMessage) { + DistributedCacheOperation.CacheOperationMessage com = + (DistributedCacheOperation.CacheOperationMessage) message; + VersionTag tag = com.getVersionTag(); + if (((opType == OpType.CREATE || opType == OpType.PUT_IF_ABSENT) + && tag.getEntryVersion() == 1) || tag.getEntryVersion() == 2) { + DistributionMessageObserver.setInstance(null); + disconnectFromDS(vm0); + } + } + } + }); + + } + }); + + // this put operation will trigger vm1 to be closed, and the put will be retried + vm3.invoke(new SerializableCallable("perform update in client") { + @Override + public Object call() throws Exception { + Region region = getCache().getRegion("region"); + switch (opType) { + case CREATE: + region.create(0, "newvalue"); + break; + case PUT: + region.put(0, "newvalue"); + break; + case DESTROY: + region.destroy(0); + break; + case INVALIDATE: + region.invalidate(0); + break; + case PUT_IF_ABSENT: + region.putIfAbsent(0, "newvalue"); + break; + case REPLACE: + region.replace(0, "newvalue"); + break; + case REPLACE_WITH_OLDVALUE: + region.replace(0, "value", "newvalue"); + break; + case REMOVE: + region.remove(0, "value"); + break; + } + return null; + } + }); + + // Verify the observer was triggered + vm1.invoke(new SerializableRunnable() { + + @Override + public void run() { + // if the observer was triggered, it would have cleared itself + assertNull(DistributionMessageObserver.getInstance()); + VersionTag tag = ((LocalRegion) getCache().getRegion("region")) + .getVersionTag(0); + if (opType == OpType.CREATE || opType == OpType.PUT_IF_ABSENT) { + assertEquals(1, tag.getRegionVersion()); + } else { + assertEquals(2, tag.getRegionVersion()); + } + } + }); + + // Make sure vm0 did in fact shut down + vm0.invoke(new SerializableRunnable() { + @Override + public void run() { + GemFireCacheImpl cache = (GemFireCacheImpl) basicGetCache(); + assertTrue(cache == null || cache.isClosed()); + } + }); + } /** * Test that we can successfully retry a distributed put all and get the version information. bug @@ -202,7 +361,6 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase { final VM vm2 = host.getVM(2); final VM vm3 = host.getVM(3); - createServerRegion(vm0, RegionShortcut.PARTITION_REDUNDANT_PERSISTENT); vm0.invoke(new SerializableRunnable() { @@ -215,27 +373,28 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase { // Add a listener to close vm1 when we send a distributed put all operation // this will cause a retry after we have applied the original put all to // the cache, causing a retry - DistributionMessageObserver.setInstance(new DistributionMessageObserver() { - - @Override - public void beforeSendMessage(ClusterDistributionManager dm, - DistributionMessage message) { - if (message instanceof DistributedPutAllOperation.PutAllMessage) { - DistributionMessageObserver.setInstance(null); - disconnectFromDS(vm1); - } - } - }); + DistributionMessageObserver + .setInstance(new DistributionMessageObserver() { + + @Override + public void beforeSendMessage(ClusterDistributionManager dm, + DistributionMessage message) { + if (message instanceof DistributedPutAllOperation.PutAllMessage) { + DistributionMessageObserver.setInstance(null); + disconnectFromDS(vm1); + } + } + }); } }); - int port1 = createServerRegion(vm1, RegionShortcut.PARTITION_REDUNDANT_PERSISTENT); - int port2 = createServerRegion(vm2, RegionShortcut.PARTITION_REDUNDANT_PERSISTENT); + int port1 = createServerRegion(vm1, + RegionShortcut.PARTITION_REDUNDANT_PERSISTENT); + int port2 = createServerRegion(vm2, + RegionShortcut.PARTITION_REDUNDANT_PERSISTENT); createClientRegion(vm3, port1, port2); - - // This will be a put all to bucket 0 // Here's the expected sequence // client->vm1 (accessor0) @@ -310,31 +469,37 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase { LogWriterUtils.getLogWriter().info("creating region in vm3"); createRegionInPeer(vm3, RegionShortcut.PARTITION_PROXY); - expectedExceptions.add(IgnoredException.addIgnoredException("RuntimeException", vm2)); - vm2.invoke(new SerializableRunnable("install message listener to ignore update") { - @Override - public void run() { - // Add a listener to close vm2 when we send a distributed put all operation - // this will cause a retry after we have applied the original put all to - // the cache, causing a retry - DistributionMessageObserver.setInstance(new DistributionMessageObserver() { - + expectedExceptions + .add(IgnoredException.addIgnoredException("RuntimeException", vm2)); + vm2.invoke( + new SerializableRunnable("install message listener to ignore update") { @Override - public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage msg) { - if (msg instanceof DistributedPutAllOperation.PutAllMessage) { - DistributionMessageObserver.setInstance(null); - Wait.pause(5000); // give vm1 time to process the message that we're ignoring - disconnectFromDS(vm0); - // no reply will be sent to vm0 due to this exception, but that's okay - // because vm0 has been shut down - throw new RuntimeException("test code is ignoring message: " + msg); - } + public void run() { + // Add a listener to close vm2 when we send a distributed put all operation + // this will cause a retry after we have applied the original put all to + // the cache, causing a retry + DistributionMessageObserver + .setInstance(new DistributionMessageObserver() { + + @Override + public void beforeProcessMessage(ClusterDistributionManager dm, + DistributionMessage msg) { + if (msg instanceof DistributedPutAllOperation.PutAllMessage) { + DistributionMessageObserver.setInstance(null); + Wait.pause( + 5000); // give vm1 time to process the message that we're ignoring + disconnectFromDS(vm0); + // no reply will be sent to vm0 due to this exception, but that's okay + // because vm0 has been shut down + throw new RuntimeException( + "test code is ignoring message: " + msg); + } + } + }); + } }); - } - }); - // This will be a put all to bucket 0 // Here's the expected sequence // accessor->vm0 (primary) @@ -354,7 +519,6 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase { } }); - // verify that the version is correct vm1.invoke(new SerializableRunnable("verify vm1") { @@ -369,7 +533,6 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase { } }); - // Verify the observer was triggered and the version is correct vm2.invoke(new SerializableRunnable("verify vm2") { @@ -404,13 +567,13 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase { }); } - - private int createServerRegion(VM vm, final RegionShortcut shortcut) { - SerializableCallable createRegion = new SerializableCallable("create server region") { + SerializableCallable createRegion = new SerializableCallable( + "create server region") { @Override public Object call() throws Exception { - RegionFactory<Object, Object> rf = getCache().createRegionFactory(shortcut); + RegionFactory<Object, Object> rf = getCache() + .createRegionFactory(shortcut); if (!shortcut.equals(RegionShortcut.REPLICATE)) { rf.setPartitionAttributes( new PartitionAttributesFactory().setRedundantCopies(2).create()); @@ -429,10 +592,12 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase { } private void createRegionInPeer(VM vm, final RegionShortcut shortcut) { - SerializableCallable createRegion = new SerializableCallable("create peer region") { + SerializableCallable createRegion = new SerializableCallable( + "create peer region") { @Override public Object call() throws Exception { - RegionFactory<Object, Object> rf = getCache().createRegionFactory(shortcut); + RegionFactory<Object, Object> rf = getCache() + .createRegionFactory(shortcut); if (!shortcut.equals(RegionShortcut.REPLICATE)) { rf.setPartitionAttributes( new PartitionAttributesFactory().setRedundantCopies(2).create()); @@ -451,7 +616,8 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase { return p; } - private int createServerRegionWithPersistence(VM vm, final boolean persistentPdxRegistry) { + private int createServerRegionWithPersistence(VM vm, + final boolean persistentPdxRegistry) { SerializableCallable createRegion = new SerializableCallable() { @Override public Object call() throws Exception { @@ -461,7 +627,8 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase { } // Cache cache = getCache(cf); - cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("store"); + cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()) + .create("store"); AttributesFactory af = new AttributesFactory(); af.setScope(Scope.DISTRIBUTED_ACK); @@ -500,17 +667,25 @@ public class ClientsWithVersioningRetryDUnitTest extends JUnit4CacheTestCase { return (Integer) vm.invoke(createRegion); } - private void createClientRegion(final VM vm, final int port1, final int port2) { - SerializableCallable createRegion = new SerializableCallable("create client region in " + vm) { + private void createClientRegion(final VM vm, final int port1, + final int port2) { + SerializableCallable createRegion = new SerializableCallable( + "create client region in " + vm) { @Override public Object call() throws Exception { ClientCacheFactory cf = new ClientCacheFactory(); + cf.addPoolServer(NetworkUtils.getServerHostName(vm.getHost()), port1); cf.addPoolServer(NetworkUtils.getServerHostName(vm.getHost()), port2); cf.setPoolPRSingleHopEnabled(false); cf.setPoolReadTimeout(10 * 60 * 1000); + cf.setPoolSubscriptionEnabled(true); + ClientCache cache = getClientCache(cf); - cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region"); + Region region = + cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY) + .create("region"); + region.registerInterest("ALL_KEYS"); return null; } }; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 3a70b9f..c958d7c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -3101,7 +3101,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, return result; } - private void cacheWriteBeforeRegionClear(RegionEventImpl event) + protected void cacheWriteBeforeRegionClear(RegionEventImpl event) throws CacheWriterException, TimeoutException { // copy into local var to prevent race condition CacheWriter writer = basicGetWriter(); @@ -5428,6 +5428,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, event.setContext(memberId); // if this is a replayed or WAN operation we may already have a version tag event.setVersionTag(clientEvent.getVersionTag()); + event.setPossibleDuplicate(clientEvent.isPossibleDuplicate()); try { basicDestroy(event, true, null); } catch (ConcurrentCacheModificationException ignore) { @@ -5459,6 +5460,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, // if this is a replayed operation we may already have a version tag event.setVersionTag(clientEvent.getVersionTag()); + event.setPossibleDuplicate(clientEvent.isPossibleDuplicate()); try { basicInvalidate(event); @@ -5485,6 +5487,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, // if this is a replayed operation we may already have a version tag event.setVersionTag(clientEvent.getVersionTag()); + event.setPossibleDuplicate(clientEvent.isPossibleDuplicate()); try { basicUpdateEntryVersion(event); @@ -8429,6 +8432,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } RegionVersionVector myVector = getVersionVector(); + System.out.println(Thread.currentThread().getName() + ": LocalRegion.clearRegionLocally region=" + + getName() + "; myVector=" + myVector); if (myVector != null) { if (isRvvDebugEnabled) { logger.trace(LogMarker.RVV_VERBOSE, "processing version information for {}", regionEvent); @@ -10764,6 +10769,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, try { event.setContext(memberId); + event.setVersionTag(clientEvent.getVersionTag()); + event.setPossibleDuplicate(clientEvent.isPossibleDuplicate()); // we rely on exceptions to tell us that the operation didn't take // place. AbstractRegionMap performs the checks and throws the exception try {
