Repository: geode Updated Branches: refs/heads/develop c4dbeb802 -> 46f4194ab
GEODE-2690: Submitting the jobs in chunks to the threadpool * Per bucket flush operations are now chunked into groups of 10 and then submitted to the thread pool. * The next chuck of jobs is submitted to the thread pool only after the first group had completed its task. * This was to prevent a situation where multiple wait for flush calls on a region with thousand of buckets will result in spawing a lot of threads. * The operating system may not be able to handle the large number of threads and throw an unable to create native thread exception. This closes #430 Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/46f4194a Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/46f4194a Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/46f4194a Branch: refs/heads/develop Commit: 46f4194ab90912e9170bd36cda897efd40b677dd Parents: c4dbeb8 Author: nabarun <n...@pivotal.io> Authored: Fri Mar 24 17:42:10 2017 -0700 Committer: nabarun <n...@pivotal.io> Committed: Mon Mar 27 16:32:35 2017 -0700 ---------------------------------------------------------------------- ...ParallelGatewaySenderFlushedCoordinator.java | 82 +++++++++++++------- 1 file changed, 56 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/46f4194a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java index 528d3bb..c5945a6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java @@ -20,7 +20,6 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe import org.apache.geode.internal.cache.*; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.WaitUntilGatewaySenderFlushedCoordinator; -import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; import java.util.ArrayList; import java.util.Collection; @@ -30,6 +29,7 @@ import java.util.concurrent.*; public class WaitUntilParallelGatewaySenderFlushedCoordinator extends WaitUntilGatewaySenderFlushedCoordinator { + final static private int CALLABLES_CHUNK_SIZE = 10; public WaitUntilParallelGatewaySenderFlushedCoordinator(AbstractGatewaySender sender, long timeout, TimeUnit unit, boolean initiator) { @@ -53,39 +53,35 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator // Submit local callables for execution ExecutorService service = this.sender.getDistributionManager().getWaitingThreadPool(); List<Future<Boolean>> callableFutures = new ArrayList<>(); - for (Callable<Boolean> callable : callables) { - callableFutures.add(service.submit(callable)); - } + int callableCount = 0; if (logger.isDebugEnabled()) { - logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Created and submitted " + logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Created and being submitted " + callables.size() + " callables=" + callables); } - - // Process local future results - for (Future<Boolean> future : callableFutures) { - boolean singleBucketResult = false; - try { - singleBucketResult = future.get(); - } catch (ExecutionException e) { - exceptionToThrow = e.getCause(); + for (Callable<Boolean> callable : callables) { + callableFutures.add(service.submit(callable)); + callableCount++; + if ((callableCount % CALLABLES_CHUNK_SIZE) == 0 || callableCount == callables.size()) { + CallablesChunkResults callablesChunkResults = + new CallablesChunkResults(localResult, exceptionToThrow, callableFutures).invoke(); + localResult = callablesChunkResults.getLocalResult(); + exceptionToThrow = callablesChunkResults.getExceptionToThrow(); + if (logger.isDebugEnabled()) { + logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Processed local result= " + + localResult + "; exceptionToThrow= " + exceptionToThrow); + } + if (exceptionToThrow != null) { + throw exceptionToThrow; + } } - localResult = localResult && singleBucketResult; - } - if (logger.isDebugEnabled()) { - logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Processed local result=" - + localResult + "; exceptionToThrow=" + exceptionToThrow); } // Return the full result - if (exceptionToThrow == null) { - if (logger.isDebugEnabled()) { - logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Returning full result=" - + (localResult)); - } - return localResult; - } else { - throw exceptionToThrow; + if (logger.isDebugEnabled()) { + logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Returning full result=" + + (localResult)); } + return localResult; } protected List<WaitUntilBucketRegionQueueFlushedCallable> buildWaitUntilBucketRegionQueueFlushedCallables( @@ -178,4 +174,38 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator } } + private class CallablesChunkResults { + private boolean localResult; + private Throwable exceptionToThrow; + private List<Future<Boolean>> callableFutures; + + public CallablesChunkResults(boolean localResult, Throwable exceptionToThrow, + List<Future<Boolean>> callableFutures) { + this.localResult = localResult; + this.exceptionToThrow = exceptionToThrow; + this.callableFutures = callableFutures; + } + + public boolean getLocalResult() { + return localResult; + } + + public Throwable getExceptionToThrow() { + return exceptionToThrow; + } + + public CallablesChunkResults invoke() throws InterruptedException { + for (Future<Boolean> future : callableFutures) { + boolean singleBucketResult = false; + try { + singleBucketResult = future.get(); + } catch (ExecutionException e) { + exceptionToThrow = e.getCause(); + } + localResult = localResult && singleBucketResult; + } + callableFutures.clear(); + return this; + } + } }