GEODE-124: Address DLock related review comments * Do not release Rebalance Dlock after each task. This means a member holds the lock for its lifetime * Increment rebalance attempt stat only if lock is acquired
https://reviews.apache.org/r/36662/ Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/65e2cc69 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/65e2cc69 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/65e2cc69 Branch: refs/heads/feature/GEODE-77 Commit: 65e2cc69ed3fadeebc5e65a831a51a8c006c34e2 Parents: 1683361 Author: Ashvin Agrawal <ash...@apache.org> Authored: Tue Jul 28 17:01:03 2015 -0700 Committer: Ashvin Agrawal <ash...@apache.org> Committed: Wed Jul 29 14:11:57 2015 -0700 ---------------------------------------------------------------------- .../gemfire/cache/util/AutoBalancer.java | 47 ++++---- .../cache/util/AutoBalancerJUnitTest.java | 118 +++++++++---------- 2 files changed, 75 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65e2cc69/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java ---------------------------------------------------------------------- diff --git a/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java b/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java index ef795b0..72a2f95 100644 --- a/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java +++ b/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java @@ -10,6 +10,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.logging.log4j.Logger; import org.quartz.CronExpression; @@ -122,6 +123,7 @@ public class AutoBalancer implements Declarable { private OOBAuditor auditor = new SizeBasedOOBAuditor(); private TimeProvider clock = new SystemClockTimeProvider(); private CacheOperationFacade cacheFacade = new GeodeCacheFacade(); + private AtomicBoolean isLockAcquired = new AtomicBoolean(false); private static final Logger logger = LogService.getLogger(); @@ -241,28 +243,32 @@ public class AutoBalancer implements Declarable { @Override public void execute() { + if (!isLockAcquired.get()) { + synchronized (isLockAcquired) { + if (!isLockAcquired.get()) { + boolean result = cacheFacade.acquireAutoBalanceLock(); + if (result) { + isLockAcquired.set(true); + } else { + if (logger.isDebugEnabled()) { + logger.debug("Another member owns auto-balance lock. Skip this attempt to rebalance the cluster"); + } + return; + } + } + } + } + cacheFacade.incrementAttemptCounter(); - boolean result = cacheFacade.acquireAutoBalanceLock(); + boolean result = needsRebalancing(); if (!result) { if (logger.isDebugEnabled()) { - logger.debug("Another member owns auto-balance lock. Skip this attempt to rebalance the cluster"); + logger.debug("Rebalancing is not needed"); } return; } - try { - result = needsRebalancing(); - if (!result) { - if (logger.isDebugEnabled()) { - logger.debug("Rebalancing is not needed"); - } - return; - } - - cacheFacade.rebalance(); - } finally { - cacheFacade.releaseAutoBalanceLock(); - } + cacheFacade.rebalance(); } /** @@ -402,6 +408,7 @@ public class AutoBalancer implements Declarable { return cache; } + @Override public boolean acquireAutoBalanceLock() { DistributedLockService dls = getDLS(); @@ -412,14 +419,6 @@ public class AutoBalancer implements Declarable { return result; } - public void releaseAutoBalanceLock() { - DistributedLockService dls = getDLS(); - dls.unlock(AUTO_BALANCER_LOCK); - if (logger.isDebugEnabled()) { - logger.debug("Successfully released auto-balance ownership"); - } - } - @Override public DistributedLockService getDLS() { GemFireCacheImpl cache = getCache(); @@ -459,8 +458,6 @@ public class AutoBalancer implements Declarable { interface CacheOperationFacade { boolean acquireAutoBalanceLock(); - void releaseAutoBalanceLock(); - DistributedLockService getDLS(); void rebalance(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65e2cc69/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java b/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java index db225cb..93680f6 100644 --- a/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java +++ b/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java @@ -124,14 +124,6 @@ public class AutoBalancerJUnitTest { { oneOf(mockCacheFacade).acquireAutoBalanceLock(); will(returnValue(false)); - oneOf(mockCacheFacade).incrementAttemptCounter(); - will(new CustomAction("increment stat") { - public Object invoke(Invocation invocation) throws Throwable { - new GeodeCacheFacade().incrementAttemptCounter(); - return null; - } - }); - allowing(mockCacheFacade); } }); @@ -139,7 +131,7 @@ public class AutoBalancerJUnitTest { AutoBalancer balancer = new AutoBalancer(); balancer.setCacheOperationFacade(mockCacheFacade); balancer.getOOBAuditor().execute(); - assertEquals(1, cache.getResourceManager().getStats().getAutoRebalanceAttempts()); + assertEquals(0, cache.getResourceManager().getStats().getAutoRebalanceAttempts()); } @Test @@ -224,79 +216,77 @@ public class AutoBalancerJUnitTest { } @Test - public void testReleaseLock() throws InterruptedException { + public void testLockStatExecuteInSequence() throws InterruptedException { cache = createBasicCache(); - final AtomicBoolean success = new AtomicBoolean(false); - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - DistributedLockService dls = new GeodeCacheFacade().getDLS(); - success.set(dls.lock(AutoBalancer.AUTO_BALANCER_LOCK, 0, -1)); - } - }); - thread.start(); - thread.join(); - - final DistributedLockService mockDLS = mockContext.mock(DistributedLockService.class); + final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class); + final Sequence sequence = mockContext.sequence("sequence"); mockContext.checking(new Expectations() { { - oneOf(mockDLS).unlock(AutoBalancer.AUTO_BALANCER_LOCK); - will(new CustomAction("release lock") { - @Override - public Object invoke(Invocation invocation) throws Throwable { - DistributedLockService dls = new GeodeCacheFacade().getDLS(); - dls.unlock(AutoBalancer.AUTO_BALANCER_LOCK); - return null; - } - }); + oneOf(mockCacheFacade).acquireAutoBalanceLock(); + inSequence(sequence); + will(returnValue(true)); + oneOf(mockCacheFacade).incrementAttemptCounter(); + inSequence(sequence); + oneOf(mockCacheFacade).getTotalTransferSize(); + inSequence(sequence); + will(returnValue(0L)); } }); - success.set(true); - thread = new Thread(new Runnable() { - @Override - public void run() { - CacheOperationFacade cacheFacade = new GeodeCacheFacade() { - public DistributedLockService getDLS() { - return mockDLS; - } - }; - try { - cacheFacade.releaseAutoBalanceLock(); - } catch (Exception e) { - success.set(false); - } - } - }); - thread.start(); - thread.join(); - assertTrue(success.get()); + AutoBalancer balancer = new AutoBalancer(); + balancer.setCacheOperationFacade(mockCacheFacade); + balancer.getOOBAuditor().execute(); } @Test - public void testLockSequence() throws InterruptedException { + public void testReusePreAcquiredLock() throws InterruptedException { cache = createBasicCache(); final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class); - final Sequence lockingSequence = mockContext.sequence("lockingSequence"); mockContext.checking(new Expectations() { { oneOf(mockCacheFacade).acquireAutoBalanceLock(); - inSequence(lockingSequence); will(returnValue(true)); - oneOf(mockCacheFacade).releaseAutoBalanceLock(); - inSequence(lockingSequence); - allowing(mockCacheFacade); + exactly(2).of(mockCacheFacade).incrementAttemptCounter(); + exactly(2).of(mockCacheFacade).getTotalTransferSize(); + will(returnValue(0L)); } }); AutoBalancer balancer = new AutoBalancer(); balancer.setCacheOperationFacade(mockCacheFacade); balancer.getOOBAuditor().execute(); + balancer.getOOBAuditor().execute(); } @Test + public void testAcquireLockAfterReleasedRemotely() throws InterruptedException { + cache = createBasicCache(); + + final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class); + final Sequence sequence = mockContext.sequence("sequence"); + mockContext.checking(new Expectations() { + { + oneOf(mockCacheFacade).acquireAutoBalanceLock(); + inSequence(sequence); + will(returnValue(false)); + oneOf(mockCacheFacade).acquireAutoBalanceLock(); + inSequence(sequence); + will(returnValue(true)); + oneOf(mockCacheFacade).incrementAttemptCounter(); + oneOf(mockCacheFacade).getTotalTransferSize(); + will(returnValue(0L)); + } + }); + + AutoBalancer balancer = new AutoBalancer(); + balancer.setCacheOperationFacade(mockCacheFacade); + balancer.getOOBAuditor().execute(); + balancer.getOOBAuditor().execute(); + } + + @Test public void testFailExecuteIfLockedElsewhere() throws InterruptedException { cache = createBasicCache(); @@ -305,7 +295,6 @@ public class AutoBalancerJUnitTest { { oneOf(mockCacheFacade).acquireAutoBalanceLock(); will(returnValue(false)); - oneOf(mockCacheFacade).incrementAttemptCounter(); // no other methods, rebalance, will be called } }); @@ -333,7 +322,6 @@ public class AutoBalancerJUnitTest { will(returnValue(true)); never(mockCacheFacade).rebalance(); oneOf(mockCacheFacade).incrementAttemptCounter(); - oneOf(mockCacheFacade).releaseAutoBalanceLock(); } }); @@ -424,21 +412,21 @@ public class AutoBalancerJUnitTest { @Test public void testOOBWhenAboveThresholdAndMin() { final long totalSize = 1000L; - + final Map<PartitionedRegion, InternalPRInfo> details = new HashMap<>(); final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class); mockContext.checking(new Expectations() { { allowing(mockCacheFacade).getRegionMemberDetails(); will(returnValue(details)); - + // first run oneOf(mockCacheFacade).getTotalDataSize(details); will(returnValue(totalSize)); oneOf(mockCacheFacade).getTotalTransferSize(); // twice threshold will(returnValue((AutoBalancer.DEFAULT_SIZE_THRESHOLD_PERCENT * totalSize / 100) * 2)); - + // second run oneOf(mockCacheFacade).getTotalDataSize(details); will(returnValue(totalSize)); @@ -447,21 +435,21 @@ public class AutoBalancerJUnitTest { will(returnValue(2 * totalSize)); } }); - + AutoBalancer balancer = new AutoBalancer(); balancer.setCacheOperationFacade(mockCacheFacade); Properties config = getBasicConfig(); config.put(AutoBalancer.SIZE_MINIMUM, "10"); balancer.init(config); SizeBasedOOBAuditor auditor = (SizeBasedOOBAuditor) balancer.getOOBAuditor(); - + // first run assertTrue(auditor.needsRebalancing()); - + // second run assertTrue(auditor.needsRebalancing()); } - + @Test public void testInitializerCacheXML() { String configStr = "<cache xmlns=\"http://schema.pivotal.io/gemfire/cache\" "