This is an automated email from the ASF dual-hosted git repository. jinmeiliao pushed a commit to branch feature/GEODE-7665 in repository https://gitbox.apache.org/repos/asf/geode.git
commit ce2a9eebfaa133978ef54d28d539498174eebc8b Author: Kirk Lund <kl...@apache.org> AuthorDate: Mon Apr 19 16:42:22 2021 -0700 GEODE-9132: PartitionedRegionClearWithConcurrentOperationsDUnitTest cleanup 3 --- ...gionClearWithConcurrentOperationsDUnitTest.java | 99 ++++++++++++---------- 1 file changed, 52 insertions(+), 47 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java index b2aacc0..7ef187f 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java @@ -53,6 +53,7 @@ import org.apache.geode.distributed.internal.DistributionMessage; import org.apache.geode.distributed.internal.DistributionMessageObserver; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper; +import org.apache.geode.internal.cache.PartitionedRegionClearMessage.OperationType; import org.apache.geode.internal.cache.versions.RegionVersionHolder; import org.apache.geode.internal.cache.versions.RegionVersionVector; import org.apache.geode.internal.cache.versions.VersionSource; @@ -83,10 +84,10 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements private VM accessor; @Before - public void setUp() throws Exception { - server1 = getVM(TestVM.SERVER1.vmNumber); - server2 = getVM(TestVM.SERVER2.vmNumber); - accessor = getVM(TestVM.ACCESSOR.vmNumber); + public void setUp() { + server1 = getVM(TestVM.SERVER1.getVmId()); + server2 = getVM(TestVM.SERVER2.getVmId()); + accessor = getVM(TestVM.ACCESSOR.getVmId()); } /** @@ -102,7 +103,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements @TestCaseName("[{index}] {method}(Coordinator:{0}, RegionType:{1})") public void clearWithConcurrentPutGetRemoveShouldWorkCorrectly(TestVM coordinatorVM, RegionShortcut regionShortcut) throws InterruptedException { - parametrizedSetup(regionShortcut); + createRegions(regionShortcut); // Let all VMs continuously execute puts and gets for 60 seconds. final int workMillis = 60000; @@ -113,7 +114,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements accessor.invokeAsync(() -> executeRemoves(entries, workMillis))); // Clear the region every second for 60 seconds. - getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 1000)); + getVM(coordinatorVM.getVmId()).invoke(() -> executeClears(workMillis, 1000)); // Let asyncInvocations finish. for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) { @@ -138,7 +139,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements @TestCaseName("[{index}] {method}(Coordinator:{0}, RegionType:{1})") public void clearWithConcurrentPutAllRemoveAllShouldWorkCorrectly(TestVM coordinatorVM, RegionShortcut regionShortcut) throws InterruptedException { - parametrizedSetup(regionShortcut); + createRegions(regionShortcut); // Let all VMs continuously execute putAll and removeAll for 15 seconds. final int workMillis = 15000; @@ -151,7 +152,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements accessor.invokeAsync(() -> executeRemoveAlls(4000, 6000, workMillis))); // Clear the region every half second for 15 seconds. - getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 500)); + getVM(coordinatorVM.getVmId()).invoke(() -> executeClears(workMillis, 500)); // Let asyncInvocations finish. for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) { @@ -176,7 +177,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements @Parameters({"PARTITION", "PARTITION_REDUNDANT"}) @TestCaseName("[{index}] {method}(RegionType:{0})") public void clearShouldFailWhenCoordinatorMemberIsBounced(RegionShortcut regionShortcut) { - parametrizedSetup(regionShortcut); + createRegions(regionShortcut); final int entries = 1000; populateRegion(accessor, entries, asList(accessor, server1, server2)); @@ -192,7 +193,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements // Wait for member to get back online and assign all buckets. server1.invoke(() -> { cacheRule.createCache(); - initDataStore(regionShortcut); + createDataStore(regionShortcut); await().untilAsserted( () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull()); PartitionRegionHelper.assignBucketsToPartitions(cacheRule.getCache().getRegion(REGION_NAME)); @@ -219,7 +220,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements @TestCaseName("[{index}] {method}(Coordinator:{0})") public void clearOnRedundantPartitionRegionWithConcurrentPutGetRemoveShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced( TestVM coordinatorVM) throws InterruptedException { - parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT); + createRegions(RegionShortcut.PARTITION_REDUNDANT); final int entries = 7500; populateRegion(accessor, entries, asList(accessor, server1, server2)); server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); @@ -234,12 +235,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements // Retry the clear operation on the region until success (server2 will go down, but other // members will eventually become primary for those buckets previously hosted by server2). - executeClearWithRetry(getVM(coordinatorVM.vmNumber)); + executeClearWithRetry(getVM(coordinatorVM.getVmId())); // Wait for member to get back online. server2.invoke(() -> { cacheRule.createCache(); - initDataStore(RegionShortcut.PARTITION_REDUNDANT); + createDataStore(RegionShortcut.PARTITION_REDUNDANT); await().untilAsserted( () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull()); }); @@ -271,7 +272,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements @TestCaseName("[{index}] {method}(Coordinator:{0})") public void clearOnNonRedundantPartitionRegionWithConcurrentPutGetRemoveShouldFailWhenNonCoordinatorMembersAreBounced( TestVM coordinatorVM) throws InterruptedException { - parametrizedSetup(RegionShortcut.PARTITION); + createRegions(RegionShortcut.PARTITION); final int entries = 7500; populateRegion(accessor, entries, asList(accessor, server1, server2)); server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); @@ -285,7 +286,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements accessor.invokeAsync(() -> executeRemoves(entries, workMillis))); // Clear the region. - getVM(coordinatorVM.vmNumber).invoke(() -> { + getVM(coordinatorVM.getVmId()).invoke(() -> { assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear()) .isInstanceOf(PartitionedRegionPartialClearException.class); }); @@ -310,7 +311,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements @TestCaseName("[{index}] {method}(Coordinator:{0})") public void clearOnRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced( TestVM coordinatorVM) throws InterruptedException { - parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT); + createRegions(RegionShortcut.PARTITION_REDUNDANT); server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); // Let all VMs continuously execute putAll/removeAll for 30 seconds. @@ -321,12 +322,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements // Retry the clear operation on the region until success (server2 will go down, but other // members will eventually become primary for those buckets previously hosted by server2). - executeClearWithRetry(getVM(coordinatorVM.vmNumber)); + executeClearWithRetry(getVM(coordinatorVM.getVmId())); // Wait for member to get back online. server2.invoke(() -> { cacheRule.createCache(); - initDataStore(RegionShortcut.PARTITION_REDUNDANT); + createDataStore(RegionShortcut.PARTITION_REDUNDANT); await().untilAsserted( () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull()); }); @@ -356,7 +357,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements @TestCaseName("[{index}] {method}(Coordinator:{0})") public void clearOnNonRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldFailWhenNonCoordinatorMembersAreBounced( TestVM coordinatorVM) throws InterruptedException { - parametrizedSetup(RegionShortcut.PARTITION); + createRegions(RegionShortcut.PARTITION); server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); final int workMillis = 30000; @@ -365,7 +366,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis))); // Clear the region. - getVM(coordinatorVM.vmNumber).invoke(() -> { + getVM(coordinatorVM.getVmId()).invoke(() -> { assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear()) .isInstanceOf(PartitionedRegionPartialClearException.class); }); @@ -376,7 +377,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements } } - private void initAccessor(RegionShortcut regionShortcut) { + private void createAccessor(RegionShortcut regionShortcut) { PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>() .setTotalNumBuckets(BUCKETS) .setLocalMaxMemory(0) @@ -388,7 +389,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements } - private void initDataStore(RegionShortcut regionShortcut) { + private void createDataStore(RegionShortcut regionShortcut) { PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>() .setTotalNumBuckets(BUCKETS) .create(); @@ -398,10 +399,10 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements .create(REGION_NAME); } - private void parametrizedSetup(RegionShortcut regionShortcut) { - server1.invoke(() -> initDataStore(regionShortcut)); - server2.invoke(() -> initDataStore(regionShortcut)); - accessor.invoke(() -> initAccessor(regionShortcut)); + private void createRegions(RegionShortcut regionShortcut) { + server1.invoke(() -> createDataStore(regionShortcut)); + server2.invoke(() -> createDataStore(regionShortcut)); + accessor.invoke(() -> createAccessor(regionShortcut)); } private void waitForSilence() { @@ -637,10 +638,14 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements private enum TestVM { ACCESSOR(0), SERVER1(1), SERVER2(2); - final int vmNumber; + private final int vmId; - TestVM(int vmNumber) { - this.vmNumber = vmNumber; + TestVM(int vmId) { + this.vmId = vmId; + } + + int getVmId() { + return vmId; } } @@ -656,24 +661,6 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements } /** - * Shutdowns the VM whenever the message is an instance of - * {@link PartitionedRegionClearMessage}. - */ - private void shutdownMember(DistributionMessage message) { - if (message instanceof PartitionedRegionClearMessage) { - if (((PartitionedRegionClearMessage) message) - .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) { - DistributionMessageObserver.setInstance(null); - InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect(); - MembershipManagerHelper - .crashDistributedSystem(InternalDistributedSystem.getConnectedInstance()); - await().untilAsserted( - () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNull()); - } - } - } - - /** * Invoked only on clear coordinator VM. * * @param dm the distribution manager that received the message @@ -702,5 +689,23 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements super.beforeProcessMessage(dm, message); } } + + /** + * Shutdowns the VM whenever the message is an instance of + * {@link PartitionedRegionClearMessage}. + */ + private void shutdownMember(DistributionMessage message) { + if (message instanceof PartitionedRegionClearMessage) { + PartitionedRegionClearMessage clearMessage = (PartitionedRegionClearMessage) message; + if (clearMessage.getOperationType() == OperationType.OP_PR_CLEAR) { + DistributionMessageObserver.setInstance(null); + InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect(); + MembershipManagerHelper + .crashDistributedSystem(InternalDistributedSystem.getConnectedInstance()); + await().untilAsserted( + () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNull()); + } + } + } } }