This is an automated email from the ASF dual-hosted git repository. ladyvader pushed a commit to branch feature/GEODE-4987 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 7729d71c98a7749b72d3e751caa2b2fb29949237 Author: Lynn Hughes-Godfrey <lhughesgodf...@pivotal.io> AuthorDate: Mon Apr 2 16:00:18 2018 -0700 GEODE-4987: Add rebalancing tests with colocated PRs and AEQ * Added new tests for colocated PRs and AEQ --- .../cache/control/RebalanceOperationDUnitTest.java | 234 ++++++++++++++++++++- 1 file changed, 233 insertions(+), 1 deletion(-) diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java index 0cdc4e6..e2591fc 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java @@ -18,6 +18,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.ENFORCE_UNIQU import static org.apache.geode.distributed.ConfigurationProperties.REDUNDANCY_ZONE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -42,6 +43,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.IntStream; import org.junit.Ignore; import org.junit.Test; @@ -62,6 +64,7 @@ import org.apache.geode.cache.LoaderHelper; import org.apache.geode.cache.PartitionAttributes; import org.apache.geode.cache.PartitionAttributesFactory; import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.asyncqueue.AsyncEvent; import org.apache.geode.cache.asyncqueue.AsyncEventListener; import org.apache.geode.cache.asyncqueue.AsyncEventQueue; @@ -104,7 +107,10 @@ import org.apache.geode.test.junit.categories.DistributedTest; @Category(DistributedTest.class) public class RebalanceOperationDUnitTest extends JUnit4CacheTestCase { - private static final long MAX_WAIT = 60; + private static final long MAX_WAIT = 600; + private static final String parentRegion = "parent-region"; + private static final String keyBase = "Object_"; + private static final String colocatedRegionBase = "colocated-region-"; @Override public final void postTearDownCacheTestCase() throws Exception { @@ -118,6 +124,232 @@ public class RebalanceOperationDUnitTest extends JUnit4CacheTestCase { System.clearProperty(DistributionConfig.GEMFIRE_PREFIX + "resource.manager.threads"); } + protected void putEntryInEachBucket(String regionName, int numBuckets) { + final Cache cache = getCache(); + Region<Object, Object> region = cache.getRegion(regionName); + assertNotNull(cache.getRegion(regionName)); + System.out.println("executing " + numBuckets + " ops on " + regionName); + IntStream.range(0, numBuckets).forEach(i -> region.put(keyBase + i, "value_" + i)); + System.out.println("executed " + numBuckets + " ops on " + regionName); + } + + protected SerializableRunnable getCreatePRRunnable(String regionName) { + SerializableRunnable runnable = new SerializableRunnable("createPR") { + public void run() { + createPR(parentRegion); + assertNotNull(getCache().getRegion(parentRegion) != null); + } + }; + return runnable; + } + + protected SerializableRunnable getConcOpsRunnable(String regionName, int numBuckets) { + SerializableRunnable runnable = new SerializableRunnable("doConcOps") { + public void run() { + final Cache cache = getCache(); + Region<Object, Object> region = cache.getRegion(regionName); + System.out.println("In concOps on " + regionName); + IntStream.range(0, numBuckets).forEach(i -> region.put(keyBase + i, "value_" + i)); + System.out.println("Done with " + numBuckets + " ops on " + regionName); + } + }; + return runnable; + } + + protected SerializableRunnable rebalance = new SerializableRunnable("rebalance") { + public void run() { + Cache cache = getCache(); + ResourceManager manager = cache.getResourceManager(); + System.out.println("starting rebalance"); + RebalanceResults results = doRebalance(false, manager); + Set<PartitionRebalanceInfo> prInfoSet = results.getPartitionRebalanceDetails(); + StringBuffer aStr; + for (PartitionRebalanceInfo prInfo : prInfoSet) { + aStr = new StringBuffer(); + aStr.append(prInfo.getRegionPath()); + aStr.append(" bucketTransfers = " + prInfo.getBucketTransfersCompleted()); + aStr.append(" primaryTransfers = " + prInfo.getPrimaryTransfersCompleted()); + aStr.append(" bucketCreates = " + prInfo.getBucketCreatesCompleted()); + System.out.println("rebalance info for " + aStr.toString()); + } + aStr = new StringBuffer(); + aStr.append(" bucketTransfers = " + results.getTotalBucketTransfersCompleted()); + aStr.append(" primaryTransfers = " + results.getTotalPrimaryTransfersCompleted()); + aStr.append(" bucketCreates = " + results.getTotalBucketCreatesCompleted()); + System.out.println("finished rebalance : overall rebalance results = " + aStr.toString()); + } + }; + + protected SerializableRunnable getCreateAEQRunnable(final String parentRegion) { + SerializableRunnable runnable = new SerializableRunnable("createAEQ") { + @Override + public void run() throws Exception { + // Create an async event listener that doesn't dispatch anything + cache.createAsyncEventQueueFactory().setMaximumQueueMemory(1).setParallel(true) + .create("parallelQueue", new AsyncEventListener() { + @Override + public void close() {} + + @Override + public boolean processEvents(List<AsyncEvent> events) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + return false; + } + return false; + } + }); + Region region = cache.getRegion(parentRegion); + region.getAttributesMutator().addAsyncEventQueueId("parallelQueue"); + } + }; + return runnable; + } + + protected SerializableRunnable getCreateColocatedRegionRunnable(final String parentRegion, + final String regionName, final RegionAttributes ratts) { + SerializableRunnable runnable = new SerializableRunnable("createdColocatedRegion") { + @Override + public void run() throws Exception { + Cache cache = getCache(); + AttributesFactory attr = new AttributesFactory(); + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(1); + paf.setRecoveryDelay(-1); + paf.setStartupRecoveryDelay(-1); + paf.setColocatedWith(parentRegion); + PartitionAttributes prAttr = paf.create(); + attr.setPartitionAttributes(prAttr); + RegionAttributes ratts = attr.create(); + + System.out.println("creating colocatedRegion " + regionName); + Region region = cache.createRegion(regionName, ratts); + System.out.println("created colocatedRegion " + regionName); + } + }; + return runnable; + } + + @Test + public void testRebalanceDuringColocatedPRCreation() throws Exception { + + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + VM vm3 = host.getVM(3); + + // Create the region in 3 of 4 VMs + vm0.invoke(getCreatePRRunnable(parentRegion)); + vm1.invoke(getCreatePRRunnable(parentRegion)); + vm2.invoke(getCreatePRRunnable(parentRegion)); + + // add data (but don't define all buckets) + int numBuckets = vm0.invoke(() -> getCache().getRegion(parentRegion).getAttributes() + .getPartitionAttributes().getTotalNumBuckets()); + vm0.invoke(() -> putEntryInEachBucket(parentRegion, numBuckets / 2)); + + // give rebalance some work to do by adding another vm + vm3.invoke(getCreatePRRunnable(parentRegion)); + + // create colocated regions while rebalance is in progress + AttributesFactory attr = new AttributesFactory(); + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(1); + paf.setRecoveryDelay(-1); + paf.setStartupRecoveryDelay(-1); + paf.setColocatedWith(parentRegion); + PartitionAttributes prAttr = paf.create(); + attr.setPartitionAttributes(prAttr); + RegionAttributes ratts = attr.create(); + + for (int i = 0; i < 1; i++) { + + AsyncInvocation aiRebalancer = vm0.invokeAsync(rebalance); + + String colocatedRegionName = colocatedRegionBase + i; + SerializableRunnable createColocatedRegion = + getCreateColocatedRegionRunnable(parentRegion, colocatedRegionName, ratts); + + AsyncInvocation aiConcOps = vm1.invokeAsync(getConcOpsRunnable(parentRegion, numBuckets)); + + AsyncInvocation ai1 = vm0.invokeAsync(createColocatedRegion); + AsyncInvocation ai2 = vm1.invokeAsync(createColocatedRegion); + AsyncInvocation ai3 = vm2.invokeAsync(createColocatedRegion); + AsyncInvocation ai4 = vm3.invokeAsync(createColocatedRegion); + + aiConcOps.join(); + aiConcOps.checkException(); + + aiRebalancer.join(); + aiRebalancer.checkException(); + + ai1.join(); + ai2.join(); + ai3.join(); + ai4.join(); + + ai1.checkException(); + ai2.checkException(); + ai3.checkException(); + ai4.checkException(); + + } + + AsyncInvocation aiRebalancer = vm0.invokeAsync(rebalance); + aiRebalancer.join(); + aiRebalancer.checkException(); + + } + + @Test + public void testRebalanceDuringAEQCreation() throws Exception { + + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + VM vm3 = host.getVM(3); + + // Create the region in 3 of 4 VMs + vm0.invoke(getCreatePRRunnable(parentRegion)); + vm1.invoke(getCreatePRRunnable(parentRegion)); + vm2.invoke(getCreatePRRunnable(parentRegion)); + + // add data (but don't define all buckets) + int numBuckets = vm0.invoke(() -> getCache().getRegion(parentRegion).getAttributes() + .getPartitionAttributes().getTotalNumBuckets()); + vm0.invoke(() -> putEntryInEachBucket(parentRegion, numBuckets / 2)); + + // give rebalance some work to do by adding another vm + vm3.invoke(getCreatePRRunnable(parentRegion)); + + AsyncInvocation aiRebalancer = vm0.invokeAsync(rebalance); + + AsyncInvocation ai1 = vm0.invokeAsync(getCreateAEQRunnable(parentRegion)); + AsyncInvocation ai2 = vm1.invokeAsync(getCreateAEQRunnable(parentRegion)); + AsyncInvocation ai3 = vm2.invokeAsync(getCreateAEQRunnable(parentRegion)); + AsyncInvocation ai4 = vm3.invokeAsync(getCreateAEQRunnable(parentRegion)); + + ai1.join(); + ai2.join(); + ai3.join(); + ai4.join(); + + ai1.checkException(); + ai2.checkException(); + ai3.checkException(); + ai4.checkException(); + + aiRebalancer.join(); + aiRebalancer.checkException(); + + aiRebalancer = vm0.invokeAsync(rebalance); + aiRebalancer.join(); + aiRebalancer.checkException(); + } + @Test public void testRecoverRedundancySimulation() { recoverRedundancy(true); -- To stop receiving notification emails like this one, please contact ladyva...@apache.org.