GEODE-124: Address DLock related review comments

* Do not release Rebalance Dlock after each task. This means a member holds the
  lock for its lifetime
* Increment rebalance attempt stat only if lock is acquired

https://reviews.apache.org/r/36662/


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/65e2cc69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/65e2cc69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/65e2cc69

Branch: refs/heads/feature/GEODE-77
Commit: 65e2cc69ed3fadeebc5e65a831a51a8c006c34e2
Parents: 1683361
Author: Ashvin Agrawal <ash...@apache.org>
Authored: Tue Jul 28 17:01:03 2015 -0700
Committer: Ashvin Agrawal <ash...@apache.org>
Committed: Wed Jul 29 14:11:57 2015 -0700

----------------------------------------------------------------------
 .../gemfire/cache/util/AutoBalancer.java        |  47 ++++----
 .../cache/util/AutoBalancerJUnitTest.java       | 118 +++++++++----------
 2 files changed, 75 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65e2cc69/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java
----------------------------------------------------------------------
diff --git 
a/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java
 
b/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java
index ef795b0..72a2f95 100644
--- 
a/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java
+++ 
b/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java
@@ -10,6 +10,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.logging.log4j.Logger;
 import org.quartz.CronExpression;
@@ -122,6 +123,7 @@ public class AutoBalancer implements Declarable {
   private OOBAuditor auditor = new SizeBasedOOBAuditor();
   private TimeProvider clock = new SystemClockTimeProvider();
   private CacheOperationFacade cacheFacade = new GeodeCacheFacade();
+  private AtomicBoolean isLockAcquired = new AtomicBoolean(false);
 
   private static final Logger logger = LogService.getLogger();
 
@@ -241,28 +243,32 @@ public class AutoBalancer implements Declarable {
 
     @Override
     public void execute() {
+      if (!isLockAcquired.get()) {
+        synchronized (isLockAcquired) {
+          if (!isLockAcquired.get()) {
+            boolean result = cacheFacade.acquireAutoBalanceLock();
+            if (result) {
+              isLockAcquired.set(true);
+            } else {
+              if (logger.isDebugEnabled()) {
+                logger.debug("Another member owns auto-balance lock. Skip this 
attempt to rebalance the cluster");
+              }
+              return;
+            }
+          }
+        }
+      }
+
       cacheFacade.incrementAttemptCounter();
-      boolean result = cacheFacade.acquireAutoBalanceLock();
+      boolean result = needsRebalancing();
       if (!result) {
         if (logger.isDebugEnabled()) {
-          logger.debug("Another member owns auto-balance lock. Skip this 
attempt to rebalance the cluster");
+          logger.debug("Rebalancing is not needed");
         }
         return;
       }
 
-      try {
-        result = needsRebalancing();
-        if (!result) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Rebalancing is not needed");
-          }
-          return;
-        }
-
-        cacheFacade.rebalance();
-      } finally {
-        cacheFacade.releaseAutoBalanceLock();
-      }
+      cacheFacade.rebalance();
     }
 
     /**
@@ -402,6 +408,7 @@ public class AutoBalancer implements Declarable {
       return cache;
     }
 
+    @Override
     public boolean acquireAutoBalanceLock() {
       DistributedLockService dls = getDLS();
 
@@ -412,14 +419,6 @@ public class AutoBalancer implements Declarable {
       return result;
     }
 
-    public void releaseAutoBalanceLock() {
-      DistributedLockService dls = getDLS();
-      dls.unlock(AUTO_BALANCER_LOCK);
-      if (logger.isDebugEnabled()) {
-        logger.debug("Successfully released auto-balance ownership");
-      }
-    }
-
     @Override
     public DistributedLockService getDLS() {
       GemFireCacheImpl cache = getCache();
@@ -459,8 +458,6 @@ public class AutoBalancer implements Declarable {
   interface CacheOperationFacade {
     boolean acquireAutoBalanceLock();
 
-    void releaseAutoBalanceLock();
-
     DistributedLockService getDLS();
 
     void rebalance();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65e2cc69/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
 
b/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
index db225cb..93680f6 100644
--- 
a/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
+++ 
b/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
@@ -124,14 +124,6 @@ public class AutoBalancerJUnitTest {
       {
         oneOf(mockCacheFacade).acquireAutoBalanceLock();
         will(returnValue(false));
-        oneOf(mockCacheFacade).incrementAttemptCounter();
-        will(new CustomAction("increment stat") {
-          public Object invoke(Invocation invocation) throws Throwable {
-            new GeodeCacheFacade().incrementAttemptCounter();
-            return null;
-          }
-        });
-        allowing(mockCacheFacade);
       }
     });
 
@@ -139,7 +131,7 @@ public class AutoBalancerJUnitTest {
     AutoBalancer balancer = new AutoBalancer();
     balancer.setCacheOperationFacade(mockCacheFacade);
     balancer.getOOBAuditor().execute();
-    assertEquals(1, 
cache.getResourceManager().getStats().getAutoRebalanceAttempts());
+    assertEquals(0, 
cache.getResourceManager().getStats().getAutoRebalanceAttempts());
   }
 
   @Test
@@ -224,79 +216,77 @@ public class AutoBalancerJUnitTest {
   }
 
   @Test
-  public void testReleaseLock() throws InterruptedException {
+  public void testLockStatExecuteInSequence() throws InterruptedException {
     cache = createBasicCache();
 
-    final AtomicBoolean success = new AtomicBoolean(false);
-    Thread thread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        DistributedLockService dls = new GeodeCacheFacade().getDLS();
-        success.set(dls.lock(AutoBalancer.AUTO_BALANCER_LOCK, 0, -1));
-      }
-    });
-    thread.start();
-    thread.join();
-
-    final DistributedLockService mockDLS = 
mockContext.mock(DistributedLockService.class);
+    final CacheOperationFacade mockCacheFacade = 
mockContext.mock(CacheOperationFacade.class);
+    final Sequence sequence = mockContext.sequence("sequence");
     mockContext.checking(new Expectations() {
       {
-        oneOf(mockDLS).unlock(AutoBalancer.AUTO_BALANCER_LOCK);
-        will(new CustomAction("release lock") {
-          @Override
-          public Object invoke(Invocation invocation) throws Throwable {
-            DistributedLockService dls = new GeodeCacheFacade().getDLS();
-            dls.unlock(AutoBalancer.AUTO_BALANCER_LOCK);
-            return null;
-          }
-        });
+        oneOf(mockCacheFacade).acquireAutoBalanceLock();
+        inSequence(sequence);
+        will(returnValue(true));
+        oneOf(mockCacheFacade).incrementAttemptCounter();
+        inSequence(sequence);
+        oneOf(mockCacheFacade).getTotalTransferSize();
+        inSequence(sequence);
+        will(returnValue(0L));
       }
     });
 
-    success.set(true);
-    thread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        CacheOperationFacade cacheFacade = new GeodeCacheFacade() {
-          public DistributedLockService getDLS() {
-            return mockDLS;
-          }
-        };
-        try {
-          cacheFacade.releaseAutoBalanceLock();
-        } catch (Exception e) {
-          success.set(false);
-        }
-      }
-    });
-    thread.start();
-    thread.join();
-    assertTrue(success.get());
+    AutoBalancer balancer = new AutoBalancer();
+    balancer.setCacheOperationFacade(mockCacheFacade);
+    balancer.getOOBAuditor().execute();
   }
 
   @Test
-  public void testLockSequence() throws InterruptedException {
+  public void testReusePreAcquiredLock() throws InterruptedException {
     cache = createBasicCache();
 
     final CacheOperationFacade mockCacheFacade = 
mockContext.mock(CacheOperationFacade.class);
-    final Sequence lockingSequence = mockContext.sequence("lockingSequence");
     mockContext.checking(new Expectations() {
       {
         oneOf(mockCacheFacade).acquireAutoBalanceLock();
-        inSequence(lockingSequence);
         will(returnValue(true));
-        oneOf(mockCacheFacade).releaseAutoBalanceLock();
-        inSequence(lockingSequence);
-        allowing(mockCacheFacade);
+        exactly(2).of(mockCacheFacade).incrementAttemptCounter();
+        exactly(2).of(mockCacheFacade).getTotalTransferSize();
+        will(returnValue(0L));
       }
     });
 
     AutoBalancer balancer = new AutoBalancer();
     balancer.setCacheOperationFacade(mockCacheFacade);
     balancer.getOOBAuditor().execute();
+    balancer.getOOBAuditor().execute();
   }
 
   @Test
+  public void testAcquireLockAfterReleasedRemotely() throws 
InterruptedException {
+    cache = createBasicCache();
+    
+    final CacheOperationFacade mockCacheFacade = 
mockContext.mock(CacheOperationFacade.class);
+    final Sequence sequence = mockContext.sequence("sequence");
+    mockContext.checking(new Expectations() {
+      {
+        oneOf(mockCacheFacade).acquireAutoBalanceLock();
+        inSequence(sequence);
+        will(returnValue(false));
+        oneOf(mockCacheFacade).acquireAutoBalanceLock();
+        inSequence(sequence);
+        will(returnValue(true));
+        oneOf(mockCacheFacade).incrementAttemptCounter();
+        oneOf(mockCacheFacade).getTotalTransferSize();
+        will(returnValue(0L));
+      }
+    });
+    
+    AutoBalancer balancer = new AutoBalancer();
+    balancer.setCacheOperationFacade(mockCacheFacade);
+    balancer.getOOBAuditor().execute();
+    balancer.getOOBAuditor().execute();
+  }
+  
+  @Test
   public void testFailExecuteIfLockedElsewhere() throws InterruptedException {
     cache = createBasicCache();
 
@@ -305,7 +295,6 @@ public class AutoBalancerJUnitTest {
       {
         oneOf(mockCacheFacade).acquireAutoBalanceLock();
         will(returnValue(false));
-        oneOf(mockCacheFacade).incrementAttemptCounter();
         // no other methods, rebalance, will be called
       }
     });
@@ -333,7 +322,6 @@ public class AutoBalancerJUnitTest {
         will(returnValue(true));
         never(mockCacheFacade).rebalance();
         oneOf(mockCacheFacade).incrementAttemptCounter();
-        oneOf(mockCacheFacade).releaseAutoBalanceLock();
       }
     });
 
@@ -424,21 +412,21 @@ public class AutoBalancerJUnitTest {
   @Test
   public void testOOBWhenAboveThresholdAndMin() {
     final long totalSize = 1000L;
-    
+
     final Map<PartitionedRegion, InternalPRInfo> details = new HashMap<>();
     final CacheOperationFacade mockCacheFacade = 
mockContext.mock(CacheOperationFacade.class);
     mockContext.checking(new Expectations() {
       {
         allowing(mockCacheFacade).getRegionMemberDetails();
         will(returnValue(details));
-        
+
         // first run
         oneOf(mockCacheFacade).getTotalDataSize(details);
         will(returnValue(totalSize));
         oneOf(mockCacheFacade).getTotalTransferSize();
         // twice threshold
         will(returnValue((AutoBalancer.DEFAULT_SIZE_THRESHOLD_PERCENT * 
totalSize / 100) * 2));
-        
+
         // second run
         oneOf(mockCacheFacade).getTotalDataSize(details);
         will(returnValue(totalSize));
@@ -447,21 +435,21 @@ public class AutoBalancerJUnitTest {
         will(returnValue(2 * totalSize));
       }
     });
-    
+
     AutoBalancer balancer = new AutoBalancer();
     balancer.setCacheOperationFacade(mockCacheFacade);
     Properties config = getBasicConfig();
     config.put(AutoBalancer.SIZE_MINIMUM, "10");
     balancer.init(config);
     SizeBasedOOBAuditor auditor = (SizeBasedOOBAuditor) 
balancer.getOOBAuditor();
-    
+
     // first run
     assertTrue(auditor.needsRebalancing());
-    
+
     // second run
     assertTrue(auditor.needsRebalancing());
   }
-  
+
   @Test
   public void testInitializerCacheXML() {
     String configStr = "<cache 
xmlns=\"http://schema.pivotal.io/gemfire/cache\";                          "

Reply via email to