This is an automated email from the ASF dual-hosted git repository.

ladyvader pushed a commit to branch feature/GEODE-4987
in repository https://gitbox.apache.org/repos/asf/geode.git

commit a8ae434990553dd15aa972587675a7a530f0b167
Author: Lynn Hughes-Godfrey <lhughesgodf...@pivotal.io>
AuthorDate: Mon Apr 2 16:00:18 2018 -0700

    GEODE-4987: Add rebalancing tests with colocated PRs and AEQ
    
    * Added new tests for colocated PRs and AEQ
---
 .../cache/control/RebalanceOperationDUnitTest.java | 234 ++++++++++++++++++++-
 1 file changed, 233 insertions(+), 1 deletion(-)

diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java
index 0cdc4e6..12f2c1c 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java
@@ -18,6 +18,7 @@ import static 
org.apache.geode.distributed.ConfigurationProperties.ENFORCE_UNIQU
 import static 
org.apache.geode.distributed.ConfigurationProperties.REDUNDANCY_ZONE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -42,6 +43,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.IntStream;
 
 import org.junit.Ignore;
 import org.junit.Test;
@@ -62,6 +64,7 @@ import org.apache.geode.cache.LoaderHelper;
 import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.asyncqueue.AsyncEvent;
 import org.apache.geode.cache.asyncqueue.AsyncEventListener;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
@@ -104,7 +107,10 @@ import 
org.apache.geode.test.junit.categories.DistributedTest;
 @Category(DistributedTest.class)
 public class RebalanceOperationDUnitTest extends JUnit4CacheTestCase {
 
-  private static final long MAX_WAIT = 60;
+  private static final long MAX_WAIT = 180;
+  private static final String parentRegion = "parent-region";
+  private static final String keyBase = "Object_";
+  private static final String colocatedRegionBase = "colocated-region-";
 
   @Override
   public final void postTearDownCacheTestCase() throws Exception {
@@ -118,6 +124,232 @@ public class RebalanceOperationDUnitTest extends 
JUnit4CacheTestCase {
     System.clearProperty(DistributionConfig.GEMFIRE_PREFIX + 
"resource.manager.threads");
   }
 
+  protected void putEntryInEachBucket(String regionName, int numBuckets) {
+    final Cache cache = getCache();
+    Region<Object, Object> region = cache.getRegion(regionName);
+    assertNotNull(cache.getRegion(regionName));
+    System.out.println("executing " + numBuckets + " ops on " + regionName);
+    IntStream.range(0, numBuckets).forEach(i -> region.put(keyBase + i, 
"value_" + i));
+    System.out.println("executed " + numBuckets + " ops on " + regionName);
+  }
+
+  protected SerializableRunnable getCreatePRRunnable(String regionName) {
+    SerializableRunnable runnable = new SerializableRunnable("createPR") {
+      public void run() {
+        createPR(parentRegion);
+        assertNotNull(getCache().getRegion(parentRegion) != null);
+      }
+    };
+    return runnable;
+  }
+
+  protected SerializableRunnable getConcOpsRunnable(String regionName, int 
numBuckets) {
+    SerializableRunnable runnable = new SerializableRunnable("doConcOps") {
+      public void run() {
+        final Cache cache = getCache();
+        Region<Object, Object> region = cache.getRegion(regionName);
+        System.out.println("In concOps on " + regionName);
+        IntStream.range(0, numBuckets).forEach(i -> region.put(keyBase + i, 
"value_" + i));
+        System.out.println("Done with " + numBuckets + " ops on " + 
regionName);
+      }
+    };
+    return runnable;
+  }
+
+  protected SerializableRunnable rebalance = new 
SerializableRunnable("rebalance") {
+    public void run() {
+      Cache cache = getCache();
+      ResourceManager manager = cache.getResourceManager();
+      System.out.println("starting rebalance");
+      RebalanceResults results = doRebalance(false, manager);
+      Set<PartitionRebalanceInfo> prInfoSet = 
results.getPartitionRebalanceDetails();
+      StringBuffer aStr;
+      for (PartitionRebalanceInfo prInfo : prInfoSet) {
+        aStr = new StringBuffer();
+        aStr.append(prInfo.getRegionPath());
+        aStr.append(" bucketTransfers = " + 
prInfo.getBucketTransfersCompleted());
+        aStr.append(" primaryTransfers = " + 
prInfo.getPrimaryTransfersCompleted());
+        aStr.append(" bucketCreates = " + prInfo.getBucketCreatesCompleted());
+        System.out.println("rebalance info for " + aStr.toString());
+      }
+      aStr = new StringBuffer();
+      aStr.append(" bucketTransfers = " + 
results.getTotalBucketTransfersCompleted());
+      aStr.append(" primaryTransfers = " + 
results.getTotalPrimaryTransfersCompleted());
+      aStr.append(" bucketCreates = " + 
results.getTotalBucketCreatesCompleted());
+      System.out.println("finished rebalance : overall rebalance results =  " 
+ aStr.toString());
+    }
+  };
+
+  protected SerializableRunnable getCreateAEQRunnable(final String 
parentRegion) {
+    SerializableRunnable runnable = new SerializableRunnable("createAEQ") {
+      @Override
+      public void run() throws Exception {
+        // Create an async event listener that doesn't dispatch anything
+        
cache.createAsyncEventQueueFactory().setMaximumQueueMemory(1).setParallel(true)
+            .create("parallelQueue", new AsyncEventListener() {
+              @Override
+              public void close() {}
+
+              @Override
+              public boolean processEvents(List<AsyncEvent> events) {
+                try {
+                  Thread.sleep(100);
+                } catch (InterruptedException e) {
+                  return false;
+                }
+                return false;
+              }
+            });
+        Region region = cache.getRegion(parentRegion);
+        region.getAttributesMutator().addAsyncEventQueueId("parallelQueue");
+      }
+    };
+    return runnable;
+  }
+
+  protected SerializableRunnable getCreateColocatedRegionRunnable(final String 
parentRegion,
+      final String regionName, final RegionAttributes ratts) {
+    SerializableRunnable runnable = new 
SerializableRunnable("createdColocatedRegion") {
+      @Override
+      public void run() throws Exception {
+        Cache cache = getCache();
+        AttributesFactory attr = new AttributesFactory();
+        PartitionAttributesFactory paf = new PartitionAttributesFactory();
+        paf.setRedundantCopies(1);
+        paf.setRecoveryDelay(-1);
+        paf.setStartupRecoveryDelay(-1);
+        paf.setColocatedWith(parentRegion);
+        PartitionAttributes prAttr = paf.create();
+        attr.setPartitionAttributes(prAttr);
+        RegionAttributes ratts = attr.create();
+
+        System.out.println("creating colocatedRegion " + regionName);
+        Region region = cache.createRegion(regionName, ratts);
+        System.out.println("created colocatedRegion " + regionName);
+      }
+    };
+    return runnable;
+  }
+
+  @Test
+  public void testRebalanceDuringColocatedPRCreation() throws Exception {
+
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+
+    // Create the region in 3 of 4 VMs
+    vm0.invoke(getCreatePRRunnable(parentRegion));
+    vm1.invoke(getCreatePRRunnable(parentRegion));
+    vm2.invoke(getCreatePRRunnable(parentRegion));
+
+    // add data (but don't define all buckets)
+    int numBuckets = vm0.invoke(() -> 
getCache().getRegion(parentRegion).getAttributes()
+        .getPartitionAttributes().getTotalNumBuckets());
+    vm0.invoke(() -> putEntryInEachBucket(parentRegion, numBuckets / 2));
+
+    // give rebalance some work to do by adding another vm
+    vm3.invoke(getCreatePRRunnable(parentRegion));
+
+    // create colocated regions while rebalance is in progress
+    AttributesFactory attr = new AttributesFactory();
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    paf.setRedundantCopies(1);
+    paf.setRecoveryDelay(-1);
+    paf.setStartupRecoveryDelay(-1);
+    paf.setColocatedWith(parentRegion);
+    PartitionAttributes prAttr = paf.create();
+    attr.setPartitionAttributes(prAttr);
+    RegionAttributes ratts = attr.create();
+
+    for (int i = 0; i < 1; i++) {
+
+      AsyncInvocation aiRebalancer = vm0.invokeAsync(rebalance);
+
+      String colocatedRegionName = colocatedRegionBase + i;
+      SerializableRunnable createColocatedRegion =
+          getCreateColocatedRegionRunnable(parentRegion, colocatedRegionName, 
ratts);
+
+      AsyncInvocation aiConcOps = 
vm1.invokeAsync(getConcOpsRunnable(parentRegion, numBuckets));
+
+      AsyncInvocation ai1 = vm0.invokeAsync(createColocatedRegion);
+      AsyncInvocation ai2 = vm1.invokeAsync(createColocatedRegion);
+      AsyncInvocation ai3 = vm2.invokeAsync(createColocatedRegion);
+      AsyncInvocation ai4 = vm3.invokeAsync(createColocatedRegion);
+
+      aiConcOps.join();
+      aiConcOps.checkException();
+
+      aiRebalancer.join();
+      aiRebalancer.checkException();
+
+      ai1.join();
+      ai2.join();
+      ai3.join();
+      ai4.join();
+
+      ai1.checkException();
+      ai2.checkException();
+      ai3.checkException();
+      ai4.checkException();
+
+    }
+
+    AsyncInvocation aiRebalancer = vm0.invokeAsync(rebalance);
+    aiRebalancer.join();
+    aiRebalancer.checkException();
+
+  }
+
+  @Test
+  public void testRebalanceDuringAEQCreation() throws Exception {
+
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+
+    // Create the region in 3 of 4 VMs
+    vm0.invoke(getCreatePRRunnable(parentRegion));
+    vm1.invoke(getCreatePRRunnable(parentRegion));
+    vm2.invoke(getCreatePRRunnable(parentRegion));
+
+    // add data (but don't define all buckets)
+    int numBuckets = vm0.invoke(() -> 
getCache().getRegion(parentRegion).getAttributes()
+        .getPartitionAttributes().getTotalNumBuckets());
+    vm0.invoke(() -> putEntryInEachBucket(parentRegion, numBuckets / 2));
+
+    // give rebalance some work to do by adding another vm
+    vm3.invoke(getCreatePRRunnable(parentRegion));
+
+    AsyncInvocation aiRebalancer = vm0.invokeAsync(rebalance);
+
+    AsyncInvocation ai1 = vm0.invokeAsync(getCreateAEQRunnable(parentRegion));
+    AsyncInvocation ai2 = vm1.invokeAsync(getCreateAEQRunnable(parentRegion));
+    AsyncInvocation ai3 = vm2.invokeAsync(getCreateAEQRunnable(parentRegion));
+    AsyncInvocation ai4 = vm3.invokeAsync(getCreateAEQRunnable(parentRegion));
+
+    aiRebalancer.join();
+    aiRebalancer.checkException();
+
+    ai1.join();
+    ai2.join();
+    ai3.join();
+    ai4.join();
+
+    ai1.checkException();
+    ai2.checkException();
+    ai3.checkException();
+    ai4.checkException();
+
+    aiRebalancer = vm0.invokeAsync(rebalance);
+    aiRebalancer.join();
+    aiRebalancer.checkException();
+  }
+
   @Test
   public void testRecoverRedundancySimulation() {
     recoverRedundancy(true);

-- 
To stop receiving notification emails like this one, please contact
ladyva...@apache.org.

Reply via email to