Enforce waitUntilFlushed timeout per bucket (as well as overall). Rework JUnit tests for new implementation.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/d0c44de5 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/d0c44de5 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/d0c44de5 Branch: refs/heads/feature/GEODE-2852 Commit: d0c44de5f0bc2d1de276fc25c5cecaf61908fa7c Parents: 6eb100d Author: Lynn Hughes-Godfrey <[email protected]> Authored: Tue May 2 13:33:49 2017 -0700 Committer: Lynn Hughes-Godfrey <[email protected]> Committed: Tue May 2 13:33:49 2017 -0700 ---------------------------------------------------------------------- ...ParallelGatewaySenderFlushedCoordinator.java | 46 ++++++++------ ...atewaySenderFlushedCoordinatorJUnitTest.java | 67 +++++++++++--------- 2 files changed, 64 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/d0c44de5/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java index 3bb220f..42ce68c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java @@ -23,8 +23,10 @@ import org.apache.geode.internal.cache.wan.WaitUntilGatewaySenderFlushedCoordina import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.*; public class WaitUntilParallelGatewaySenderFlushedCoordinator @@ -46,28 +48,30 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator sender.getCancelCriterion().checkCancelInProgress(null); } - // Create callables for local buckets - List<WaitUntilBucketRegionQueueFlushedCallable> callables = - buildWaitUntilBucketRegionQueueFlushedCallables(pr); - - // Submit local callables for execution ExecutorService service = this.sender.getDistributionManager().getWaitingThreadPool(); List<Future<Boolean>> callableFutures = new ArrayList<>(); int callableCount = 0; - if (logger.isDebugEnabled()) { - logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Created and being submitted " - + callables.size() + " callables=" + callables); - } - long endTime = System.nanoTime() + unit.toNanos(timeout); - for (Callable<Boolean> callable : callables) { + long nanosRemaining = unit.toNanos(timeout); + long endTime = System.nanoTime() + nanosRemaining; + Set<BucketRegion> localBucketRegions = getLocalBucketRegions(pr); + for (BucketRegion br : localBucketRegions) { // timeout exceeded, do not submit more callables, return localResult false if (System.nanoTime() >= endTime) { localResult = false; break; } + // create and submit callable with updated timeout + Callable<Boolean> callable = createWaitUntilBucketRegionQueueFlushedCallable( + (BucketRegionQueue) br, nanosRemaining, TimeUnit.NANOSECONDS); + if (logger.isDebugEnabled()) { + logger.debug( + "WaitUntilParallelGatewaySenderFlushedCoordinator: Submitting callable for bucket " + + br.getId() + " callable=" + callable + " nanosRemaining=" + nanosRemaining); + } callableFutures.add(service.submit(callable)); callableCount++; - if ((callableCount % CALLABLES_CHUNK_SIZE) == 0 || callableCount == callables.size()) { + if ((callableCount % CALLABLES_CHUNK_SIZE) == 0 + || callableCount == localBucketRegions.size()) { CallablesChunkResults callablesChunkResults = new CallablesChunkResults(localResult, exceptionToThrow, callableFutures).invoke(); localResult = callablesChunkResults.getLocalResult(); @@ -80,6 +84,7 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator throw exceptionToThrow; } } + nanosRemaining = endTime - System.nanoTime(); } // Return the full result @@ -90,16 +95,17 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator return localResult; } - protected List<WaitUntilBucketRegionQueueFlushedCallable> buildWaitUntilBucketRegionQueueFlushedCallables( - PartitionedRegion pr) { - List<WaitUntilBucketRegionQueueFlushedCallable> callables = new ArrayList<>(); + protected Set<BucketRegion> getLocalBucketRegions(PartitionedRegion pr) { + Set<BucketRegion> localBucketRegions = new HashSet<BucketRegion>(); if (pr.isDataStore()) { - for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) { - callables.add(new WaitUntilBucketRegionQueueFlushedCallable((BucketRegionQueue) br, - this.timeout, this.unit)); - } + localBucketRegions = pr.getDataStore().getAllLocalBucketRegions(); } - return callables; + return localBucketRegions; + } + + protected WaitUntilBucketRegionQueueFlushedCallable createWaitUntilBucketRegionQueueFlushedCallable( + BucketRegionQueue br, long timeout, TimeUnit unit) { + return new WaitUntilBucketRegionQueueFlushedCallable(br, timeout, unit); } public static class WaitUntilBucketRegionQueueFlushedCallable implements Callable<Boolean> { http://git-wip-us.apache.org/repos/asf/geode/blob/d0c44de5/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java index d957f91..77902f8 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java @@ -14,6 +14,8 @@ */ package org.apache.geode.internal.cache.wan.parallel; +import org.apache.geode.internal.cache.BucketRegion; +import org.apache.geode.internal.cache.BucketRegionQueue; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor; import org.apache.geode.internal.cache.wan.WaitUntilGatewaySenderFlushedCoordinatorJUnitTest; @@ -23,7 +25,9 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertFalse; @@ -32,12 +36,14 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; @Category(IntegrationTest.class) public class WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest extends WaitUntilGatewaySenderFlushedCoordinatorJUnitTest { private PartitionedRegion region; + private BucketRegionQueue brq; protected void createGatewaySender() { super.createGatewaySender(); @@ -46,6 +52,7 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest doReturn(queue).when(this.sender).getQueue(); this.region = mock(PartitionedRegion.class); doReturn(this.region).when(queue).getRegion(); + this.brq = mock(BucketRegionQueue.class); } protected AbstractGatewaySenderEventProcessor getEventProcessor() { @@ -56,69 +63,71 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest @Test public void testWaitUntilParallelGatewaySenderFlushedSuccessfulNotInitiator() throws Throwable { + long timeout = 5000; + TimeUnit unit = TimeUnit.MILLISECONDS; WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = - new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 1000l, - TimeUnit.MILLISECONDS, false); + new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, timeout, unit, false); WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator); - doReturn(getSuccessfulCallables(true)).when(coordinatorSpy) - .buildWaitUntilBucketRegionQueueFlushedCallables(this.region); + doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any()); + doReturn(getCallableResult(true)).when(coordinatorSpy) + .createWaitUntilBucketRegionQueueFlushedCallable(any(), anyLong(), any()); boolean result = coordinatorSpy.waitUntilFlushed(); assertTrue(result); } @Test public void testWaitUntilParallelGatewaySenderFlushedUnsuccessfulNotInitiator() throws Throwable { + long timeout = 5000; + TimeUnit unit = TimeUnit.MILLISECONDS; WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = - new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 1000l, - TimeUnit.MILLISECONDS, false); + new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, timeout, unit, false); WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator); - doReturn(getUnsuccessfulCallables()).when(coordinatorSpy) - .buildWaitUntilBucketRegionQueueFlushedCallables(this.region); + doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any()); + doReturn(getCallableResult(false)).when(coordinatorSpy) + .createWaitUntilBucketRegionQueueFlushedCallable(any(), anyLong(), any()); boolean result = coordinatorSpy.waitUntilFlushed(); assertFalse(result); } @Test public void testWaitUntilParallelGatewaySenderFlushedSuccessfulInitiator() throws Throwable { + long timeout = 5000; + TimeUnit unit = TimeUnit.MILLISECONDS; WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = - new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 1000l, - TimeUnit.MILLISECONDS, true); + new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, timeout, unit, true); WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator); - doReturn(getSuccessfulCallables(true)).when(coordinatorSpy) - .buildWaitUntilBucketRegionQueueFlushedCallables(this.region); + doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any()); + doReturn(getCallableResult(true)).when(coordinatorSpy) + .createWaitUntilBucketRegionQueueFlushedCallable(any(), anyLong(), any()); boolean result = coordinatorSpy.waitUntilFlushed(); assertTrue(result); } @Test public void testWaitUntilParallelGatewaySenderFlushedUnsuccessfulInitiator() throws Throwable { + long timeout = 5000; + TimeUnit unit = TimeUnit.MILLISECONDS; WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = - new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 1000l, - TimeUnit.MILLISECONDS, true); + new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, timeout, unit, true); WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator); - doReturn(getSuccessfulCallables(false)).when(coordinatorSpy) - .buildWaitUntilBucketRegionQueueFlushedCallables(this.region); + doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any()); + doReturn(getCallableResult(false)).when(coordinatorSpy) + .createWaitUntilBucketRegionQueueFlushedCallable(any(), anyLong(), any()); boolean result = coordinatorSpy.waitUntilFlushed(); assertFalse(result); } - private List<WaitUntilBucketRegionQueueFlushedCallable> getSuccessfulCallables( - boolean expectedResult) throws Exception { - List callables = new ArrayList(); + private WaitUntilBucketRegionQueueFlushedCallable getCallableResult(boolean expectedResult) + throws Exception { WaitUntilBucketRegionQueueFlushedCallable callable = mock(WaitUntilBucketRegionQueueFlushedCallable.class); when(callable.call()).thenReturn(expectedResult); - callables.add(callable); - return callables; + return callable; } - private List<WaitUntilBucketRegionQueueFlushedCallable> getUnsuccessfulCallables() - throws Exception { - List callables = new ArrayList(); - WaitUntilBucketRegionQueueFlushedCallable callable = - mock(WaitUntilBucketRegionQueueFlushedCallable.class); - when(callable.call()).thenReturn(false); - callables.add(callable); - return callables; + private Set<BucketRegionQueue> getLocalBucketRegions() { + Set<BucketRegionQueue> localBucketRegions = new HashSet<BucketRegionQueue>(); + localBucketRegions.add(this.brq); + return localBucketRegions; } }
