This is an automated email from the ASF dual-hosted git repository. jinmeiliao pushed a commit to branch feature/GEODE-7665 in repository https://gitbox.apache.org/repos/asf/geode.git
commit a22e9b504511eb43f77ef1810169e65f00c5417c Author: Kirk Lund <[email protected]> AuthorDate: Mon Apr 19 14:38:02 2021 -0700 GEODE-9132: PartitionedRegionClearWithConcurrentOperationsDUnitTest cleanup 1 --- ...gionClearWithConcurrentOperationsDUnitTest.java | 97 +++++++++------------- 1 file changed, 41 insertions(+), 56 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java index 77537cbda3..c9a1e5b86d 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java @@ -28,11 +28,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; import junitparams.JUnitParamsRunner; @@ -61,7 +56,6 @@ import org.apache.geode.distributed.internal.membership.api.MembershipManagerHel import org.apache.geode.internal.cache.versions.RegionVersionHolder; import org.apache.geode.internal.cache.versions.RegionVersionVector; import org.apache.geode.internal.cache.versions.VersionSource; -import org.apache.geode.test.awaitility.GeodeAwaitility; import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.VM; import org.apache.geode.test.dunit.rules.CacheRule; @@ -73,15 +67,16 @@ import org.apache.geode.test.dunit.rules.DistributedRule; * added or removed. */ @RunWith(JUnitParamsRunner.class) +@SuppressWarnings("serial") public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements Serializable { - private static final Integer BUCKETS = 13; + + private static final int BUCKETS = 13; private static final String REGION_NAME = "PartitionedRegion"; private static final String TEST_CASE_NAME = "[{index}] {method}(Coordinator:{0}, RegionType:{1})"; @Rule public DistributedRule distributedRule = new DistributedRule(3); - @Rule public CacheRule cacheRule = CacheRule.builder().createCacheInAll().build(); @@ -89,22 +84,6 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements private VM server2; private VM accessor; - private enum TestVM { - ACCESSOR(0), SERVER1(1), SERVER2(2); - - final int vmNumber; - - TestVM(int vmNumber) { - this.vmNumber = vmNumber; - } - } - - static RegionShortcut[] regionTypes() { - return new RegionShortcut[] { - RegionShortcut.PARTITION, RegionShortcut.PARTITION_REDUNDANT - }; - } - @SuppressWarnings("unused") static TestVM[] coordinators() { return new TestVM[] { @@ -114,7 +93,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements @SuppressWarnings("unused") static Object[] coordinatorsAndRegionTypes() { - ArrayList<Object[]> parameters = new ArrayList<>(); + List<Object[]> parameters = new ArrayList<>(); RegionShortcut[] regionShortcuts = regionTypes(); Arrays.stream(regionShortcuts).forEach(regionShortcut -> { @@ -125,6 +104,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements return parameters.toArray(); } + private static RegionShortcut[] regionTypes() { + return new RegionShortcut[] { + RegionShortcut.PARTITION, RegionShortcut.PARTITION_REDUNDANT + }; + } + @Before public void setUp() throws Exception { server1 = getVM(TestVM.SERVER1.vmNumber); @@ -178,7 +163,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements /** * Populates the region and verifies the data on the selected VMs. */ - private void populateRegion(VM feeder, int entryCount, List<VM> vms) { + private void populateRegion(VM feeder, int entryCount, Iterable<VM> vms) { feeder.invoke(() -> { Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME); IntStream.range(0, entryCount).forEach(i -> region.put(String.valueOf(i), "Value_" + i)); @@ -240,14 +225,13 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements * Asserts that the region data is consistent across buckets. */ private void assertRegionBucketsConsistency() throws ForceReattemptException { - List<BucketDump> bucketDumps; PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME); // Redundant copies + 1 primary. int expectedCopies = region.getRedundantCopies() + 1; for (int bId = 0; bId < BUCKETS; bId++) { final int bucketId = bId; - bucketDumps = region.getAllBucketEntries(bucketId); + List<BucketDump> bucketDumps = region.getAllBucketEntries(bucketId); assertThat(bucketDumps.size()) .as("Bucket " + bucketId + " should have " + expectedCopies + " copies, but has " + bucketDumps.size()) @@ -379,26 +363,16 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements /** * Continuously execute clear operations on the PartitionedRegion every periodInMillis for the - * given - * durationInMillis. + * given durationInMillis. */ - private void executeClears(final long durationInMillis, final long periodInMillis) - throws InterruptedException { + private void executeClears(final long durationInMillis, final long periodInMillis) { Cache cache = cacheRule.getCache(); - AtomicLong invocationCount = new AtomicLong(0); Region<String, String> region = cache.getRegion(REGION_NAME); - Long minimumInvocationCount = (durationInMillis / periodInMillis); - ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); - ScheduledFuture<?> scheduledFuture = executor.scheduleWithFixedDelay(() -> { + long minimumInvocationCount = durationInMillis / periodInMillis; + + for (int invocationCount = 0; invocationCount < minimumInvocationCount; invocationCount++) { region.clear(); - invocationCount.incrementAndGet(); - }, 0, periodInMillis, TimeUnit.MILLISECONDS); - - await().untilAsserted( - () -> assertThat(invocationCount.get()).isGreaterThanOrEqualTo(minimumInvocationCount)); - scheduledFuture.cancel(false); - executor.shutdown(); - executor.awaitTermination(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS); + } } /** @@ -413,11 +387,11 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements @Parameters(method = "coordinatorsAndRegionTypes") public void clearWithConcurrentPutGetRemoveShouldWorkCorrectly(TestVM coordinatorVM, RegionShortcut regionShortcut) throws InterruptedException { - final int entries = 15000; - final int workMillis = 60000; parametrizedSetup(regionShortcut); // Let all VMs continuously execute puts and gets for 60 seconds. + final int workMillis = 60000; + final int entries = 15000; List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( server1.invokeAsync(() -> executePuts(entries, workMillis)), server2.invokeAsync(() -> executeGets(entries, workMillis)), @@ -448,10 +422,10 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements @Parameters(method = "coordinatorsAndRegionTypes") public void clearWithConcurrentPutAllRemoveAllShouldWorkCorrectly(TestVM coordinatorVM, RegionShortcut regionShortcut) throws InterruptedException { - final int workMillis = 15000; parametrizedSetup(regionShortcut); // Let all VMs continuously execute putAll and removeAll for 15 seconds. + final int workMillis = 15000; List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( server1.invokeAsync(() -> executePutAlls(0, 2000, workMillis)), server1.invokeAsync(() -> executeRemoveAlls(0, 2000, workMillis)), @@ -486,8 +460,8 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements @TestCaseName("[{index}] {method}(RegionType:{0})") @Parameters(method = "regionTypes") public void clearShouldFailWhenCoordinatorMemberIsBounced(RegionShortcut regionShortcut) { - final int entries = 1000; parametrizedSetup(regionShortcut); + final int entries = 1000; populateRegion(accessor, entries, asList(accessor, server1, server2)); // Set the CoordinatorMemberKiller and try to clear the region @@ -529,13 +503,13 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements @TestCaseName("[{index}] {method}(Coordinator:{0})") public void clearOnRedundantPartitionRegionWithConcurrentPutGetRemoveShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced( TestVM coordinatorVM) throws InterruptedException { - final int entries = 7500; - final int workMillis = 30000; parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT); + final int entries = 7500; populateRegion(accessor, entries, asList(accessor, server1, server2)); server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30" + final int workMillis = 30000; List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( server1.invokeAsync(() -> executeGets(entries, workMillis)), server1.invokeAsync(() -> executePuts(entries, workMillis)), @@ -581,13 +555,13 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements @TestCaseName("[{index}] {method}(Coordinator:{0})") public void clearOnNonRedundantPartitionRegionWithConcurrentPutGetRemoveShouldFailWhenNonCoordinatorMembersAreBounced( TestVM coordinatorVM) throws InterruptedException { - final int entries = 7500; - final int workMillis = 30000; parametrizedSetup(RegionShortcut.PARTITION); + final int entries = 7500; populateRegion(accessor, entries, asList(accessor, server1, server2)); server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30" + final int workMillis = 30000; List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( server1.invokeAsync(() -> executeGets(entries, workMillis)), server1.invokeAsync(() -> executePuts(entries, workMillis)), @@ -620,11 +594,11 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements @TestCaseName("[{index}] {method}(Coordinator:{0})") public void clearOnRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced( TestVM coordinatorVM) throws InterruptedException { - final int workMillis = 30000; parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT); server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); // Let all VMs continuously execute putAll/removeAll for 30 seconds. + final int workMillis = 30000; List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)), accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis))); @@ -666,10 +640,10 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements @TestCaseName("[{index}] {method}(Coordinator:{0})") public void clearOnNonRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldFailWhenNonCoordinatorMembersAreBounced( TestVM coordinatorVM) throws InterruptedException { - final int workMillis = 30000; parametrizedSetup(RegionShortcut.PARTITION); server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); + final int workMillis = 30000; List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)), accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis))); @@ -686,13 +660,24 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements } } + private enum TestVM { + ACCESSOR(0), SERVER1(1), SERVER2(2); + + final int vmNumber; + + TestVM(int vmNumber) { + this.vmNumber = vmNumber; + } + } + /** * Shutdowns a coordinator member while the clear operation is in progress. */ - public static class MemberKiller extends DistributionMessageObserver { + private static class MemberKiller extends DistributionMessageObserver { + private final boolean coordinator; - public MemberKiller(boolean coordinator) { + private MemberKiller(boolean coordinator) { this.coordinator = coordinator; }
