Repository: geode
Updated Branches:
  refs/heads/develop c4dbeb802 -> 46f4194ab


GEODE-2690: Submitting the jobs in chunks to the threadpool

        * Per bucket flush operations are now chunked into groups of 10 and 
then submitted to the thread pool.
        * The next chuck of jobs is submitted to the thread pool only after the 
first group had completed its task.
        * This was to prevent a situation where multiple wait for flush calls 
on a region with thousand of buckets will result in spawing a lot of threads.
        * The operating system may not be able to handle the large number of 
threads and throw an unable to create native thread exception.

        This closes #430


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/46f4194a
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/46f4194a
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/46f4194a

Branch: refs/heads/develop
Commit: 46f4194ab90912e9170bd36cda897efd40b677dd
Parents: c4dbeb8
Author: nabarun <n...@pivotal.io>
Authored: Fri Mar 24 17:42:10 2017 -0700
Committer: nabarun <n...@pivotal.io>
Committed: Mon Mar 27 16:32:35 2017 -0700

----------------------------------------------------------------------
 ...ParallelGatewaySenderFlushedCoordinator.java | 82 +++++++++++++-------
 1 file changed, 56 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/46f4194a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
index 528d3bb..c5945a6 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
@@ -20,7 +20,6 @@ import 
org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.internal.cache.*;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import 
org.apache.geode.internal.cache.wan.WaitUntilGatewaySenderFlushedCoordinator;
-import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -30,6 +29,7 @@ import java.util.concurrent.*;
 
 public class WaitUntilParallelGatewaySenderFlushedCoordinator
     extends WaitUntilGatewaySenderFlushedCoordinator {
+  final static private int CALLABLES_CHUNK_SIZE = 10;
 
   public 
WaitUntilParallelGatewaySenderFlushedCoordinator(AbstractGatewaySender sender,
       long timeout, TimeUnit unit, boolean initiator) {
@@ -53,39 +53,35 @@ public class 
WaitUntilParallelGatewaySenderFlushedCoordinator
     // Submit local callables for execution
     ExecutorService service = 
this.sender.getDistributionManager().getWaitingThreadPool();
     List<Future<Boolean>> callableFutures = new ArrayList<>();
-    for (Callable<Boolean> callable : callables) {
-      callableFutures.add(service.submit(callable));
-    }
+    int callableCount = 0;
     if (logger.isDebugEnabled()) {
-      logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Created 
and submitted "
+      logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Created 
and being submitted "
           + callables.size() + " callables=" + callables);
     }
-
-    // Process local future results
-    for (Future<Boolean> future : callableFutures) {
-      boolean singleBucketResult = false;
-      try {
-        singleBucketResult = future.get();
-      } catch (ExecutionException e) {
-        exceptionToThrow = e.getCause();
+    for (Callable<Boolean> callable : callables) {
+      callableFutures.add(service.submit(callable));
+      callableCount++;
+      if ((callableCount % CALLABLES_CHUNK_SIZE) == 0 || callableCount == 
callables.size()) {
+        CallablesChunkResults callablesChunkResults =
+            new CallablesChunkResults(localResult, exceptionToThrow, 
callableFutures).invoke();
+        localResult = callablesChunkResults.getLocalResult();
+        exceptionToThrow = callablesChunkResults.getExceptionToThrow();
+        if (logger.isDebugEnabled()) {
+          logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: 
Processed local result= "
+              + localResult + "; exceptionToThrow= " + exceptionToThrow);
+        }
+        if (exceptionToThrow != null) {
+          throw exceptionToThrow;
+        }
       }
-      localResult = localResult && singleBucketResult;
-    }
-    if (logger.isDebugEnabled()) {
-      logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: 
Processed local result="
-          + localResult + "; exceptionToThrow=" + exceptionToThrow);
     }
 
     // Return the full result
-    if (exceptionToThrow == null) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: 
Returning full result="
-            + (localResult));
-      }
-      return localResult;
-    } else {
-      throw exceptionToThrow;
+    if (logger.isDebugEnabled()) {
+      logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: 
Returning full result="
+          + (localResult));
     }
+    return localResult;
   }
 
   protected List<WaitUntilBucketRegionQueueFlushedCallable> 
buildWaitUntilBucketRegionQueueFlushedCallables(
@@ -178,4 +174,38 @@ public class 
WaitUntilParallelGatewaySenderFlushedCoordinator
     }
   }
 
+  private class CallablesChunkResults {
+    private boolean localResult;
+    private Throwable exceptionToThrow;
+    private List<Future<Boolean>> callableFutures;
+
+    public CallablesChunkResults(boolean localResult, Throwable 
exceptionToThrow,
+        List<Future<Boolean>> callableFutures) {
+      this.localResult = localResult;
+      this.exceptionToThrow = exceptionToThrow;
+      this.callableFutures = callableFutures;
+    }
+
+    public boolean getLocalResult() {
+      return localResult;
+    }
+
+    public Throwable getExceptionToThrow() {
+      return exceptionToThrow;
+    }
+
+    public CallablesChunkResults invoke() throws InterruptedException {
+      for (Future<Boolean> future : callableFutures) {
+        boolean singleBucketResult = false;
+        try {
+          singleBucketResult = future.get();
+        } catch (ExecutionException e) {
+          exceptionToThrow = e.getCause();
+        }
+        localResult = localResult && singleBucketResult;
+      }
+      callableFutures.clear();
+      return this;
+    }
+  }
 }

Reply via email to