Decoupling NettyClient from control flow policy

Summary: This diff refactors NettyClient by decoupling flow control mechanism 
from NettyClient. Through the refactoring process, some performance and 
correctness bugs have been found due to the better readability of the 
refactored code.

Test Plan:
mvn clean verify
Tested large jobs and the output was correct
Tested large jobs and it did not have any performance degradation for codes 
using the old mechanism

Reviewers: maja.kabiljo, sergey.edunov, avery.ching, dionysis.logothetis

Reviewed By: dionysis.logothetis

Differential Revision: https://reviews.facebook.net/D56367


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b90b59d2
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b90b59d2
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b90b59d2

Branch: refs/heads/trunk
Commit: b90b59d2b5064782c501dfca5a1497f1ef513a6a
Parents: b5284cd
Author: Sergey Edunov <[email protected]>
Authored: Tue Apr 26 15:34:42 2016 -0700
Committer: Sergey Edunov <[email protected]>
Committed: Tue Apr 26 15:34:42 2016 -0700

----------------------------------------------------------------------
 findbugs-exclude.xml                            |   6 +-
 .../apache/giraph/bsp/CentralizedService.java   |   8 +
 .../giraph/bsp/CentralizedServiceMaster.java    |   8 -
 .../giraph/bsp/CentralizedServiceWorker.java    |  15 -
 .../org/apache/giraph/comm/MasterClient.java    |   8 +
 .../org/apache/giraph/comm/MasterServer.java    |   9 +
 .../org/apache/giraph/comm/WorkerClient.java    |   6 +
 .../org/apache/giraph/comm/WorkerServer.java    |   8 +
 .../flow_control/CreditBasedFlowControl.java    | 300 +++++++++
 .../giraph/comm/flow_control/FlowControl.java   |  83 +++
 .../comm/flow_control/NoOpFlowControl.java      |  67 ++
 .../comm/flow_control/StaticFlowControl.java    | 163 +++++
 .../giraph/comm/flow_control/package-info.java  |  21 +
 .../apache/giraph/comm/netty/NettyClient.java   | 665 +++----------------
 .../giraph/comm/netty/NettyMasterClient.java    |   6 +
 .../giraph/comm/netty/NettyMasterServer.java    |   6 +
 .../apache/giraph/comm/netty/NettyServer.java   |  11 +
 .../giraph/comm/netty/NettyWorkerClient.java    |  10 +-
 .../giraph/comm/netty/NettyWorkerServer.java    |   6 +
 .../handler/MasterRequestServerHandler.java     |  25 +-
 .../netty/handler/RequestServerHandler.java     |  45 +-
 .../handler/WorkerRequestServerHandler.java     |  28 +-
 .../apache/giraph/master/BspServiceMaster.java  |   1 +
 .../org/apache/giraph/master/MasterInfo.java    |   4 +-
 .../giraph/utils/AdjustableSemaphore.java       |  59 ++
 .../apache/giraph/worker/BspServiceWorker.java  |  12 +-
 .../org/apache/giraph/comm/ConnectionTest.java  |   5 +
 .../apache/giraph/comm/RequestFailureTest.java  |   3 +-
 .../org/apache/giraph/comm/RequestTest.java     |  11 +-
 .../apache/giraph/comm/SaslConnectionTest.java  |   1 +
 30 files changed, 921 insertions(+), 679 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml
index 0ab2c73..a7d8124 100644
--- a/findbugs-exclude.xml
+++ b/findbugs-exclude.xml
@@ -69,11 +69,7 @@
     <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/>
   </Match>
   <Match>
-    <Class name="org.apache.giraph.comm.netty.handler.RequestServerHandler"/>
-    <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/>
-  </Match>
-  <Match>
-    <Class name="org.apache.giraph.comm.netty.handler.ResponseClientHandler"/>
+    <Class name="org.apache.giraph.comm.flow_control.CreditBasedFlowControl"/>
     <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/>
   </Match>
   <Match>

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java 
b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
index 0cadfb7..93bc4bd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
@@ -21,6 +21,7 @@ package org.apache.giraph.bsp;
 import java.util.List;
 
 import org.apache.giraph.job.JobProgressTracker;
+import org.apache.giraph.master.MasterInfo;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -60,6 +61,13 @@ public interface CentralizedService<I extends 
WritableComparable,
   List<WorkerInfo> getWorkerInfoList();
 
   /**
+   * Get master info
+   *
+   * @return Master info
+   */
+  MasterInfo getMasterInfo();
+
+  /**
    * Get JobProgressTracker to report progress to
    *
    * @return JobProgressTrackerClient

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java 
b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index f05a79d..01eb98a 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -24,7 +24,6 @@ import java.util.List;
 import org.apache.giraph.master.AggregatorToGlobalCommTranslation;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterGlobalCommHandler;
-import org.apache.giraph.master.MasterInfo;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -54,13 +53,6 @@ public interface CentralizedServiceMaster<I extends 
WritableComparable,
   boolean becomeMaster();
 
   /**
-   * Get master information
-   *
-   * @return Master information
-   */
-  MasterInfo getMasterInfo();
-
-  /**
    * Check all the {@link org.apache.giraph.worker.WorkerInfo} objects to 
ensure
    * that a minimum number of good workers exists out of the total that have
    * reported.

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java 
b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index 94cd265..c3c16eb 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -25,7 +25,6 @@ import org.apache.giraph.graph.GlobalStats;
 import org.apache.giraph.graph.GraphTaskManager;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.superstep_output.SuperstepOutput;
-import org.apache.giraph.master.MasterInfo;
 import org.apache.giraph.metrics.GiraphTimerContext;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.partition.PartitionStats;
@@ -183,13 +182,6 @@ public interface CentralizedServiceWorker<I extends 
WritableComparable,
       Collection<? extends PartitionOwner> masterSetPartitionOwners);
 
   /**
-   * Get master info
-   *
-   * @return Master info
-   */
-  MasterInfo getMasterInfo();
-
-  /**
    * Get the GraphTaskManager that this service is using.  Vertices need to 
know
    * this.
    *
@@ -248,13 +240,6 @@ public interface CentralizedServiceWorker<I extends 
WritableComparable,
   GlobalStats getGlobalStats();
 
   /**
-   * Get the number of partitions owned by this worker
-   *
-   * @return number of partitions owned
-   */
-  int getNumPartitionsOwned();
-
-  /**
    * Get input splits handler used during input
    *
    * @return Input splits handler

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
index 244dd74..c0a30c3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
@@ -20,6 +20,7 @@ package org.apache.giraph.comm;
 
 import java.io.IOException;
 
+import org.apache.giraph.comm.flow_control.FlowControl;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.hadoop.io.Writable;
 
@@ -66,5 +67,12 @@ public interface MasterClient {
    * Closes all connections.
    */
   void closeConnections();
+
+  /**
+   * Get the reference to the flow control policy used for sending requests
+   *
+   * @return reference to the flow control policy
+   */
+  FlowControl getFlowControl();
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/MasterServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MasterServer.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/MasterServer.java
index 4a70979..07f3c45 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/MasterServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/MasterServer.java
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.comm;
 
+import org.apache.giraph.comm.flow_control.FlowControl;
+
 import java.net.InetSocketAddress;
 
 /**
@@ -42,4 +44,11 @@ public interface MasterServer {
    * Shuts down.
    */
   void close();
+
+  /**
+   * Inform the server about the flow control policy used in sending requests
+   *
+   * @param flowControl reference to flow control policy
+   */
+  void setFlowControl(FlowControl flowControl);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java
index a84a14d..0035a3b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.comm;
 
+import org.apache.giraph.comm.flow_control.FlowControl;
 import org.apache.giraph.comm.requests.WritableRequest;
 
 import org.apache.giraph.partition.PartitionOwner;
@@ -96,4 +97,9 @@ else[HADOOP_NON_SECURE]*/
    */
   void authenticate() throws IOException;
 /*end[HADOOP_NON_SECURE]*/
+
+  /**
+   * @return the flow control used in sending requests
+   */
+  FlowControl getFlowControl();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
index ab7787d..efd9421 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.comm;
 
+import org.apache.giraph.comm.flow_control.FlowControl;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -64,4 +65,11 @@ public interface WorkerServer<I extends WritableComparable,
    * Shuts down.
    */
   void close();
+
+  /**
+   * Inform this server about the flow control used in sending requests
+   *
+   * @param flowControl reference to the flow control policy
+   */
+  void setFlowControl(FlowControl flowControl);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
new file mode 100644
index 0000000..ff82dd1
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.flow_control;
+
+import com.google.common.collect.Maps;
+import org.apache.giraph.comm.netty.NettyClient;
+import org.apache.giraph.comm.netty.handler.AckSignalFlag;
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.utils.AdjustableSemaphore;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
+
+/**
+ * Representation of credit-based flow control policy where each worker has a
+ * constant user-defined credit. The number of open requests to a particular
+ * worker cannot be more than its specified credit.
+ */
+public class CreditBasedFlowControl implements FlowControl {
+  /**
+   * Maximum number of requests we can have per worker without confirmation
+   * (i.e. open requests)
+   */
+  public static final IntConfOption MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER =
+      new IntConfOption("giraph.maxOpenRequestsPerWorker", 20,
+          "Maximum number of requests without confirmation we can have per " +
+              "worker");
+  /** Aggregate number of in-memory unsent requests */
+  public static final IntConfOption MAX_NUM_OF_UNSENT_REQUESTS =
+      new IntConfOption("giraph.maxNumberOfUnsentRequests", 2000,
+          "Maximum number of unsent requests we can keep in memory");
+  /**
+   * Time interval to wait on unsent requests cahce until we find a spot in it
+   */
+  public static final IntConfOption UNSENT_CACHE_WAIT_INTERVAL =
+      new IntConfOption("giraph.unsentCacheWaitInterval", 1000,
+          "Time interval to wait on unsent requests cache (in milliseconds)");
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(CreditBasedFlowControl.class);
+
+  /** Waiting interval on unsent requests cache until it frees up */
+  private final int unsentWaitMsecs;
+  /** Waiting interval for checking outstanding requests msecs */
+  private final int waitingRequestMsecs;
+  /** Maximum number of open requests we can have for each worker */
+  private final int maxOpenRequestsPerWorker;
+  /** Total number of unsent, cached requests */
+  private final AtomicInteger aggregateUnsentRequests = new AtomicInteger(0);
+  /**
+   * Map of requests permits per worker. Key in the map is the worker id and 
the
+   * value is the semaphore to control the number of open requests for the
+   * particular worker. Basically, the number of available permits on this
+   * semaphore is the credit available for the worker.
+   */
+  private final ConcurrentMap<Integer, AdjustableSemaphore>
+      perWorkerOpenRequestMap = Maps.newConcurrentMap();
+  /** Map of unsent cached requests per worker */
+  private final ConcurrentMap<Integer, Deque<WritableRequest>>
+      perWorkerUnsentRequestMap = Maps.newConcurrentMap();
+  /**
+   * Semaphore to control number of cached unsent requests. Maximum number of
+   * permits of this semaphore should be equal to MAX_NUM_OF_UNSENT_REQUESTS.
+   */
+  private final Semaphore unsentRequestPermit;
+  /** Netty client used for sending requests */
+  private final NettyClient nettyClient;
+
+  /**
+   * Constructor
+   * @param conf configuration
+   * @param nettyClient netty client
+   */
+  public CreditBasedFlowControl(ImmutableClassesGiraphConfiguration conf,
+                                NettyClient nettyClient) {
+    this.nettyClient = nettyClient;
+    maxOpenRequestsPerWorker = MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
+    checkState(maxOpenRequestsPerWorker < 0x4000 &&
+        maxOpenRequestsPerWorker > 0, "NettyClient: max number of open " +
+        "requests should be in range (0, " + 0x4FFF + ")");
+    unsentRequestPermit = new Semaphore(MAX_NUM_OF_UNSENT_REQUESTS.get(conf));
+    unsentWaitMsecs = UNSENT_CACHE_WAIT_INTERVAL.get(conf);
+    waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
+  }
+
+  @Override
+  public void sendRequest(int destTaskId, WritableRequest request) {
+    AdjustableSemaphore openRequestPermit =
+        perWorkerOpenRequestMap.get(destTaskId);
+    // Check if this is the first time sending a request to a worker. If so, we
+    // should the worker id to necessary bookkeeping data structure.
+    if (openRequestPermit == null) {
+      openRequestPermit = new AdjustableSemaphore(maxOpenRequestsPerWorker);
+      AdjustableSemaphore temp = 
perWorkerOpenRequestMap.putIfAbsent(destTaskId,
+          openRequestPermit);
+      perWorkerUnsentRequestMap
+          .putIfAbsent(destTaskId, new ArrayDeque<WritableRequest>());
+      if (temp != null) {
+        openRequestPermit = temp;
+      }
+    }
+    // Try to reserve a spot for the request amongst the open requests of
+    // the destination worker.
+    boolean shouldSend = openRequestPermit.tryAcquire();
+    boolean shouldCache = false;
+    while (!shouldSend) {
+      // We should not send the request, and should cache the request instead.
+      // It may be possible that the unsent message cache is also full, so we
+      // should try to acquire a space on the cache, and if there is no extra
+      // space in unsent request cache, we should wait until some space
+      // become available. However, it is possible that during the time we are
+      // waiting on the unsent messages cache, actual buffer for open requests
+      // frees up space.
+      try {
+        shouldCache = unsentRequestPermit.tryAcquire(unsentWaitMsecs,
+            TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("shouldSend: failed " +
+            "while waiting on the unsent request cache to have some more " +
+            "room for extra unsent requests!");
+      }
+      if (shouldCache) {
+        break;
+      }
+      // We may have an open spot in the meantime that we were waiting on the
+      // unsent requests.
+      shouldSend = openRequestPermit.tryAcquire();
+      if (shouldSend) {
+        break;
+      }
+      // The current thread will be at this point only if it could not make
+      // space amongst open requests for the destination worker and has been
+      // timed-out in trying to acquire a space amongst unsent messages. So,
+      // we should report logs, report progress, and check for request
+      // failures.
+      nettyClient.logAndSanityCheck();
+    }
+    // Either shouldSend == true or shouldCache == true
+    if (shouldCache) {
+      Deque<WritableRequest> unsentRequests =
+          perWorkerUnsentRequestMap.get(destTaskId);
+      // This synchronize block is necessary for the following reason:
+      // Once we are at this point, it means there was no room for this
+      // request to become an open request, hence we have to put it into
+      // unsent cache. Consider the case that since last time we checked if
+      // there is any room for an additional open request so far, all open
+      // requests are delivered and their acknowledgements are also processed.
+      // Now, if we put this request in the unsent cache, it is not being
+      // considered to become an open request, as the only one who checks
+      // on this matter would be the one who receives an acknowledgment for an
+      // open request for the destination worker. So, a lock is necessary
+      // to forcefully serialize the execution if this scenario is about to
+      // happen.
+      synchronized (unsentRequests) {
+        shouldSend = openRequestPermit.tryAcquire();
+        if (!shouldSend) {
+          aggregateUnsentRequests.getAndIncrement();
+          unsentRequests.add(request);
+          return;
+        }
+      }
+      // We found a spot amongst open requests to send this request. So, this
+      // request won't be cached anymore.
+      unsentRequestPermit.release();
+    }
+    nettyClient.doSend(destTaskId, request);
+  }
+
+  /**
+   * Whether response specifies that credit should be ignored
+   *
+   * @param response response received
+   * @return true iff credit should be ignored, false otherwise
+   */
+  private boolean shouldIgnoreCredit(short response) {
+    return ((short) ((response >> 14) & 1)) == 1;
+  }
+
+  /**
+   * Get the credit from a response
+   *
+   * @param response response received
+   * @return credit from the received response
+   */
+  private short getCredit(short response) {
+    return (short) (response & 0x3FFF);
+  }
+
+  /**
+   * Get the response flag from a response
+   *
+   * @param response response received
+   * @return AckSignalFlag coming with the response
+   */
+  @Override
+  public AckSignalFlag getAckSignalFlag(short response) {
+    return AckSignalFlag.values()[(response >> 15) & 1];
+  }
+
+  @Override
+  public short calculateResponse(AckSignalFlag flag, int taskId) {
+    boolean ignoreCredit = nettyClient.masterInvolved(taskId);
+    return (short) ((flag.ordinal() << 15) |
+        ((ignoreCredit ? 1 : 0) << 14) | (maxOpenRequestsPerWorker & 0x3FFF));
+  }
+
+  @Override
+  public void waitAllRequests() {
+    while (true) {
+      synchronized (aggregateUnsentRequests) {
+        if (aggregateUnsentRequests.get() == 0) {
+          break;
+        }
+        try {
+          aggregateUnsentRequests.wait(waitingRequestMsecs);
+        } catch (InterruptedException e) {
+          throw new IllegalStateException("waitSomeRequests: failed while " +
+              "waiting on open/cached requests");
+        }
+      }
+      if (aggregateUnsentRequests.get() == 0) {
+        break;
+      }
+      nettyClient.logAndSanityCheck();
+    }
+  }
+
+  @Override
+  public int getNumberOfUnsentRequests() {
+    return aggregateUnsentRequests.get();
+  }
+
+  @Override
+  public void messageAckReceived(int taskId, short response) {
+    boolean shouldIgnoreCredit = shouldIgnoreCredit(response);
+    short credit = getCredit(response);
+    AdjustableSemaphore openRequestPermit =
+        perWorkerOpenRequestMap.get(taskId);
+    openRequestPermit.release();
+    if (!shouldIgnoreCredit) {
+      openRequestPermit.setMaxPermits(credit);
+    }
+    Deque<WritableRequest> requestDeque =
+        perWorkerUnsentRequestMap.get(taskId);
+    // Since we received a response and we changed the credit of the sender
+    // client, we may be able to send some more requests to the sender
+    // client. So, we try to send as much request as we can to the sender
+    // client.
+    while (true) {
+      WritableRequest request;
+      synchronized (requestDeque) {
+        request = requestDeque.pollFirst();
+        if (request == null) {
+          break;
+        }
+        // See whether the sender client has any unused credit
+        if (!openRequestPermit.tryAcquire()) {
+          requestDeque.offerFirst(request);
+          break;
+        }
+      }
+      // At this point, we have a request, and we reserved a credit for the
+      // sender client. So, we send the request to the client and update
+      // the state.
+      nettyClient.doSend(taskId, request);
+      if (aggregateUnsentRequests.decrementAndGet() == 0) {
+        synchronized (aggregateUnsentRequests) {
+          aggregateUnsentRequests.notifyAll();
+        }
+      }
+      unsentRequestPermit.release();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
new file mode 100644
index 0000000..4eda193
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.flow_control;
+
+import org.apache.giraph.comm.netty.handler.AckSignalFlag;
+import org.apache.giraph.comm.requests.WritableRequest;
+
+/**
+ * Interface representing flow control policy in sending requests
+ */
+public interface FlowControl {
+  /**
+   * This method is called by a network client for all requests that should be
+   * handled by a *remote* task. All these requests should be controlled and/or
+   * monitored by the flow control policy. The flow control policy may choose 
to
+   * temporarily hold off from sending to a particular remote task and keep the
+   * request in some cache for later transfer. A flow control mechanism is free
+   * to implement this method as blocking or non-blocking. Note that, a
+   * flow-control policy should adhere to exactly-once semantic, meaning it
+   * should always send one and only one copy of each request that should be
+   * handled by a remote task.
+   *
+   * @param destTaskId id of the worker to send the request to
+   * @param request request to send
+   */
+  void sendRequest(int destTaskId, WritableRequest request);
+
+  /**
+   * Notify the flow control policy that an open request is completed.
+   *
+   * @param taskId id of the task to which the open request is completed
+   * @param response the response heard from the task
+   */
+  void messageAckReceived(int taskId, short response);
+
+  /**
+   * Decode the acknowledgement signal from the response after an open request
+   * is completed
+   *
+   * @param response the response heard after completion of a request
+   * @return the Acknowledgement signal decoded from the response
+   */
+  AckSignalFlag getAckSignalFlag(short response);
+
+  /**
+   * There may be requests in possession of the flow control mechanism, as the
+   * mechanism controls whether a task should send a request or not.
+   * Calling this method causes the caller to wait until all requests in
+   * possession of the flow control mechanism are sent out.
+   */
+  void waitAllRequests();
+
+  /**
+   * @return number of unsent requests in possession of the flow control policy
+   */
+  int getNumberOfUnsentRequests();
+
+  /**
+   * Calculate/Build the response to piggyback with acknowledgement
+   *
+   * @param flag indicating the status of processing of the request (whether it
+   *             was a new request or it was a duplicate)
+   * @param taskId id of the task the acknowledgement is for
+   * @return the response to piggyback along with the acknowledgement message
+   */
+  short calculateResponse(AckSignalFlag flag, int taskId);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
new file mode 100644
index 0000000..d50fe92
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.flow_control;
+
+import org.apache.giraph.comm.netty.NettyClient;
+import org.apache.giraph.comm.netty.handler.AckSignalFlag;
+import org.apache.giraph.comm.requests.WritableRequest;
+
+/**
+ * Representation of a flow control policy that does not do anything other than
+ * the vanilla network client request transfer mechanism
+ */
+public class NoOpFlowControl implements FlowControl {
+  /** Netty client */
+  private final NettyClient nettyClient;
+
+  /**
+   * Constructor
+   *
+   * @param nettyClient netty client
+   */
+  public NoOpFlowControl(NettyClient nettyClient) {
+    this.nettyClient = nettyClient;
+  }
+
+  @Override
+  public void sendRequest(int destTaskId, WritableRequest request) {
+    nettyClient.doSend(destTaskId, request);
+  }
+
+  @Override
+  public void messageAckReceived(int taskId, short response) { }
+
+  @Override
+  public AckSignalFlag getAckSignalFlag(short response) {
+    return AckSignalFlag.values()[response];
+  }
+
+  @Override
+  public void waitAllRequests() { }
+
+  @Override
+  public int getNumberOfUnsentRequests() {
+    return 0;
+  }
+
+  @Override
+  public short calculateResponse(AckSignalFlag alreadyDone, int taskId) {
+    return (short) alreadyDone.ordinal();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
new file mode 100644
index 0000000..1fc43a7
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.flow_control;
+
+import com.yammer.metrics.core.Counter;
+import org.apache.giraph.comm.netty.NettyClient;
+import org.apache.giraph.comm.netty.handler.AckSignalFlag;
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.conf.FloatConfOption;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.MetricNames;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.log4j.Logger;
+
+import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
+
+/**
+ * Representation of a flow control that limits the aggregate number of open
+ * requests to all other workers to a constant user-defined value
+ */
+public class StaticFlowControl implements
+    FlowControl, ResetSuperstepMetricsObserver {
+  /** Maximum number of requests without confirmation we should have */
+  public static final IntConfOption MAX_NUMBER_OF_OPEN_REQUESTS =
+      new IntConfOption("giraph.maxNumberOfOpenRequests", 10000,
+          "Maximum number of requests without confirmation we should have");
+  /**
+   * After pausing a thread due to too large number of open requests,
+   * which fraction of these requests need to be closed before we continue
+   */
+  public static final FloatConfOption
+      FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING =
+      new FloatConfOption("giraph.fractionOfRequestsToCloseBeforeProceeding",
+          0.2f, "Fraction of requests to close before proceeding");
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(StaticFlowControl.class);
+
+  /** Maximum number of requests without confirmation we can have */
+  private final int maxNumberOfOpenRequests;
+  /**
+   * Maximum number of requests that can be open after the pause in order to
+   * proceed
+   */
+  private final int numberOfRequestsToProceed;
+  /** Netty client used for sending requests */
+  private final NettyClient nettyClient;
+  /** Waiting interval for checking outstanding requests msecs */
+  private final int waitingRequestMsecs;
+  /** Dummy object to wait on until enough open requests get completed */
+  private final Object requestSpotAvailable = new Object();
+  /** Counter for time spent waiting on too many open requests */
+  private Counter timeWaitingOnOpenRequests;
+
+  /**
+   * Constructor
+   *
+   * @param conf configuration
+   * @param nettyClient netty client
+   */
+  public StaticFlowControl(ImmutableClassesGiraphConfiguration conf,
+                           NettyClient nettyClient) {
+    this.nettyClient = nettyClient;
+    maxNumberOfOpenRequests = MAX_NUMBER_OF_OPEN_REQUESTS.get(conf);
+    numberOfRequestsToProceed = (int) (maxNumberOfOpenRequests *
+        (1 - FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING.get(conf)));
+    if (LOG.isInfoEnabled()) {
+      LOG.info("StaticFlowControl: Limit number of open requests to " +
+          maxNumberOfOpenRequests + " and proceed when <= " +
+          numberOfRequestsToProceed);
+    }
+    waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
+    GiraphMetrics.get().addSuperstepResetObserver(this);
+  }
+
+  @Override
+  public void newSuperstep(SuperstepMetricsRegistry metrics) {
+    timeWaitingOnOpenRequests = metrics.getCounter(
+        MetricNames.TIME_SPENT_WAITING_ON_TOO_MANY_OPEN_REQUESTS_MS);
+  }
+
+  @Override
+  public void sendRequest(int destTaskId, WritableRequest request) {
+    nettyClient.doSend(destTaskId, request);
+    if (nettyClient.getNumberOfOpenRequests() > maxNumberOfOpenRequests) {
+      long startTime = System.currentTimeMillis();
+      waitSomeRequests();
+      timeWaitingOnOpenRequests.inc(System.currentTimeMillis() - startTime);
+    }
+  }
+
+  /**
+   * Ensure that at most maxOpenRequests are not complete.  Periodically,
+   * check the state of every request.  If we find the connection failed,
+   * re-establish it and re-send the request.
+   */
+  private void waitSomeRequests() {
+    while (nettyClient.getNumberOfOpenRequests() > numberOfRequestsToProceed) {
+      // Wait for requests to complete for some time
+      synchronized (requestSpotAvailable) {
+        if (nettyClient.getNumberOfOpenRequests() <=
+            numberOfRequestsToProceed) {
+          break;
+        }
+        try {
+          requestSpotAvailable.wait(waitingRequestMsecs);
+        } catch (InterruptedException e) {
+          throw new IllegalStateException("waitSomeRequests: Got unexpected " +
+              "InterruptedException", e);
+        }
+      }
+      nettyClient.logAndSanityCheck();
+    }
+  }
+
+  @Override
+  public void messageAckReceived(int taskId, short response) {
+    synchronized (requestSpotAvailable) {
+      requestSpotAvailable.notifyAll();
+    }
+  }
+
+  @Override
+  public AckSignalFlag getAckSignalFlag(short response) {
+    return AckSignalFlag.values()[response];
+  }
+
+  @Override
+  public short calculateResponse(AckSignalFlag alreadyDone, int taskId) {
+    return (short) alreadyDone.ordinal();
+  }
+
+  @Override
+  public void waitAllRequests() {
+    // This flow control policy does not keep any unsent request. All the open
+    // requests are in possession of the network client, so the network client
+    // will wait for them to complete. Thus, this flow control policy does not
+    // need to do anything regarding flushing the remaining requests.
+  }
+
+  @Override
+  public int getNumberOfUnsentRequests() {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/package-info.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/package-info.java
new file mode 100644
index 0000000..5aa4ae9
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package for flow-control policies.
+ */
+package org.apache.giraph.comm.flow_control;

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 15f0502..863449a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -18,8 +18,10 @@
 
 package org.apache.giraph.comm.netty;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
+import org.apache.giraph.comm.flow_control.FlowControl;
+import org.apache.giraph.comm.flow_control.NoOpFlowControl;
+import org.apache.giraph.comm.flow_control.StaticFlowControl;
 import org.apache.giraph.comm.netty.handler.AckSignalFlag;
 import org.apache.giraph.comm.netty.handler.AddressRequestIdGenerator;
 import org.apache.giraph.comm.netty.handler.ClientRequestId;
@@ -34,15 +36,10 @@ import 
org.apache.giraph.comm.requests.SaslTokenMessageRequest;
 /*end[HADOOP_NON_SECURE]*/
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.BooleanConfOption;
-import org.apache.giraph.conf.FloatConfOption;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.IntConfOption;
 import org.apache.giraph.graph.TaskInfo;
-import org.apache.giraph.metrics.GiraphMetrics;
-import org.apache.giraph.metrics.MetricNames;
-import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
-import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.master.MasterInfo;
 import org.apache.giraph.utils.PipelineUtils;
 import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.utils.ThreadUtils;
@@ -53,22 +50,17 @@ import org.apache.log4j.Logger;
 import com.google.common.collect.Lists;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.Maps;
-import com.yammer.metrics.core.Counter;
 
 /*if_not[HADOOP_NON_SECURE]*/
 import java.io.IOException;
 /*end[HADOOP_NON_SECURE]*/
 import java.net.InetSocketAddress;
-import java.util.ArrayDeque;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.Deque;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -100,19 +92,15 @@ import static 
org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_THRE
 import static 
org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER;
 import static 
org.apache.giraph.conf.GiraphConstants.NETTY_MAX_CONNECTION_FAILURES;
 import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
+
 /**
  * Netty client for sending requests.  Thread-safe.
  */
-public class NettyClient implements ResetSuperstepMetricsObserver {
+public class NettyClient {
   /** Do we have a limit on number of open requests we can have */
   public static final BooleanConfOption LIMIT_NUMBER_OF_OPEN_REQUESTS =
       new BooleanConfOption("giraph.waitForRequestsConfirmation", false,
           "Whether to have a limit on number of open requests or not");
-  /** Maximum number of requests without confirmation we should have */
-  public static final IntConfOption MAX_NUMBER_OF_OPEN_REQUESTS =
-      new IntConfOption("giraph.maxNumberOfOpenRequests", 10000,
-          "Maximum number of requests without confirmation we should have");
-
   /**
    * Do we have a limit on number of open requests we can have for each worker.
    * Note that if this option is enabled, Netty will not keep more than a
@@ -126,32 +114,6 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
       new BooleanConfOption("giraph.waitForPerWorkerRequests", false,
           "Whether to have a limit on number of open requests for each worker" 
+
               "or not");
-  /**
-   * Maximum number of requests we can have per worker without confirmation
-   * (i.e. open requests)
-   */
-  public static final IntConfOption MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER =
-      new IntConfOption("giraph.maxOpenRequestsPerWorker", 20,
-          "Maximum number of requests without confirmation we can have per " +
-              "worker");
-  /** Aggregate number of in-memory unsent requests */
-  public static final IntConfOption MAX_NUM_OF_UNSENT_REQUESTS =
-      new IntConfOption("giraph.maxNumberOfUnsentRequests", 2000,
-          "Maximum number of unsent requests we can keep in memory");
-  /**
-   * Time interval to wait on unsent requests cahce until we find a spot in it
-   */
-  public static final IntConfOption UNSENT_CACHE_WAIT_INTERVAL =
-      new IntConfOption("giraph.unsentCacheWaitInterval", 1000,
-          "Time interval to wait on unsent requests cache (in milliseconds)");
-  /**
-   * After pausing a thread due to too large number of open requests,
-   * which fraction of these requests need to be closed before we continue
-   */
-  public static final FloatConfOption
-  FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING =
-      new FloatConfOption("giraph.fractionOfRequestsToCloseBeforeProceeding",
-          0.2f, "Fraction of requests to close before proceeding");
   /** Maximum number of requests to list (for debugging) */
   public static final int MAX_REQUESTS_TO_LIST = 10;
   /**
@@ -200,50 +162,8 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
   private final int sendBufferSize;
   /** Receive buffer size */
   private final int receiveBufferSize;
-  /** Do we have a limit on number of open requests */
-  private final boolean limitNumberOfOpenRequests;
   /** Warn if request size is bigger than the buffer size by this factor */
   private final float requestSizeWarningThreshold;
-  /** Maximum number of requests without confirmation we can have */
-  private final int maxNumberOfOpenRequests;
-  /** Wait interval on unsent requests cache until it frees up */
-  private final int unsentWaitMsecs;
-  /**
-   * Maximum number of requests that can be open after the pause in order to
-   * proceed
-   */
-  private final int numberOfRequestsToProceed;
-  /** Do we have a limit on the number of open requests per worker */
-  private final boolean limitOpenRequestsPerWorker;
-  /** Maximum number of open requests we can have for each worker */
-  private final int maxOpenRequestsPerWorker;
-  /** Total number of open requests */
-  private final AtomicInteger aggregateOpenRequests;
-  /** Total number of unsent, cached requests */
-  private final AtomicInteger aggregateUnsentRequests;
-  /**
-   * Map of requests per worker. Key in the map is the worker id and the value
-   * in the map is a pair (X, Y) where:
-   *   Y = map of client request ids to request information for a particular
-   *       worker,
-   *   X = the semaphore to control the number of open requests for the
-   *       the particular worker. Basically, the number of available permits
-   *       on this semaphore is the credit available for the worker (in credit-
-   *       based control-flow mechanism), and the number of permits already
-   *       taken from the semaphore should be equal to the size of Y (unless a
-   *       transition is happening).
-   */
-  private final ConcurrentMap<Integer,
-      Pair<AdjustableSemaphore, ConcurrentMap<ClientRequestId, RequestInfo>>>
-      perWorkerOpenRequestMap;
-  /** Map of unsent, cached requests per worker */
-  private final ConcurrentMap<Integer, Deque<WritableRequest>>
-      perWorkerUnsentRequestMap;
-  /**
-   * Semaphore to control number of cached unsent request. Maximum number of
-   * permits of this semaphore should be equal to MAX_NUM_OF_UNSENT_REQUESTS.
-   */
-  private final Semaphore unsentRequestPermit;
   /** Maximum number of connection failures */
   private final int maxConnectionFailures;
   /** Maximum number of milliseconds for a request */
@@ -278,8 +198,8 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
    */
   private final LogOnErrorChannelFutureListener logErrorListener =
       new LogOnErrorChannelFutureListener();
-  /** Counter for time spent waiting on too many open requests */
-  private Counter timeWaitingOnOpenRequests;
+  /** Flow control policy used */
+  private final FlowControl flowControl;
 
   /**
    * Only constructor
@@ -302,44 +222,19 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
     this.requestSizeWarningThreshold =
         GiraphConstants.REQUEST_SIZE_WARNING_THRESHOLD.get(conf);
 
-    limitNumberOfOpenRequests = LIMIT_NUMBER_OF_OPEN_REQUESTS.get(conf);
+    boolean limitNumberOfOpenRequests = 
LIMIT_NUMBER_OF_OPEN_REQUESTS.get(conf);
+    boolean limitOpenRequestsPerWorker =
+        LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
+    checkState(!limitNumberOfOpenRequests || !limitOpenRequestsPerWorker,
+        "NettyClient: it is not allowed to have both limitations on the " +
+            "number of total open requests, and on the number of open " +
+            "requests per worker!");
     if (limitNumberOfOpenRequests) {
-      maxNumberOfOpenRequests = MAX_NUMBER_OF_OPEN_REQUESTS.get(conf);
-      numberOfRequestsToProceed = (int) (maxNumberOfOpenRequests *
-          (1 - FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING.get(conf)));
-      if (LOG.isInfoEnabled()) {
-        LOG.info("NettyClient: Limit number of open requests to " +
-            maxNumberOfOpenRequests + " and proceed when <= " +
-            numberOfRequestsToProceed);
-      }
-    } else {
-      maxNumberOfOpenRequests = -1;
-      numberOfRequestsToProceed = 0;
-    }
-    limitOpenRequestsPerWorker = LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
-    /* Maximum number of unsent requests we can have in total */
-    if (limitOpenRequestsPerWorker) {
-      checkState(!limitNumberOfOpenRequests, "NettyClient: it is not allowed " 
+
-          "to have both limitation on the number of total open requests, and " 
+
-          "limitation on the number of open requests per worker!");
-      maxOpenRequestsPerWorker = MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
-      checkState(maxOpenRequestsPerWorker > 0x3FFF ||
-          maxOpenRequestsPerWorker < 1, "NettyClient: max number of open " +
-          "requests should be in range [1, " + 0x3FFF + "]");
-      aggregateOpenRequests = new AtomicInteger(0);
-      aggregateUnsentRequests = new AtomicInteger(0);
-      perWorkerOpenRequestMap = Maps.newConcurrentMap();
-      perWorkerUnsentRequestMap = Maps.newConcurrentMap();
-      unsentRequestPermit = new 
Semaphore(MAX_NUM_OF_UNSENT_REQUESTS.get(conf));
-      unsentWaitMsecs = UNSENT_CACHE_WAIT_INTERVAL.get(conf);
+      flowControl = new StaticFlowControl(conf, this);
+    } else if (limitOpenRequestsPerWorker) {
+      flowControl = new CreditBasedFlowControl(conf, this);
     } else {
-      maxOpenRequestsPerWorker = -1;
-      aggregateOpenRequests = new AtomicInteger(-1);
-      aggregateUnsentRequests = new AtomicInteger(-1);
-      perWorkerOpenRequestMap = null;
-      perWorkerUnsentRequestMap = null;
-      unsentRequestPermit = null;
-      unsentWaitMsecs = -1;
+      flowControl = new NoOpFlowControl(this);
     }
 
     maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
@@ -351,8 +246,6 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
     clientRequestIdRequestInfoMap =
         new MapMaker().concurrencyLevel(maxPoolSize).makeMap();
 
-    GiraphMetrics.get().addSuperstepResetObserver(this);
-
     handlerToUseExecutionGroup =
         NETTY_CLIENT_EXECUTION_AFTER_HANDLER.get(conf);
     useExecutionGroup = NETTY_CLIENT_USE_EXECUTION_HANDLER.get(conf);
@@ -478,10 +371,15 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
         });
   }
 
-  @Override
-  public void newSuperstep(SuperstepMetricsRegistry metrics) {
-    timeWaitingOnOpenRequests = metrics.getCounter(
-        MetricNames.TIME_SPENT_WAITING_ON_TOO_MANY_OPEN_REQUESTS_MS);
+  /**
+   * Whether master task is involved in the communication with a given client
+   *
+   * @param clientId id of the communication (on the end of the communication)
+   * @return true if master is on one end of the communication
+   */
+  public boolean masterInvolved(int clientId) {
+    return myTaskInfo.getTaskId() == MasterInfo.MASTER_TASK_ID ||
+        clientId == MasterInfo.MASTER_TASK_ID;
   }
 
   /**
@@ -545,15 +443,6 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
             "address " + address);
       }
 
-      if (limitOpenRequestsPerWorker &&
-          !perWorkerOpenRequestMap.containsKey(taskId)) {
-        perWorkerOpenRequestMap.put(taskId,
-            new ImmutablePair<>(
-                new AdjustableSemaphore(maxOpenRequestsPerWorker),
-                Maps.<ClientRequestId, RequestInfo>newConcurrentMap()));
-        perWorkerUnsentRequestMap
-            .put(taskId, new ArrayDeque<WritableRequest>());
-      }
       if (addressChannelMap.containsKey(address)) {
         continue;
       }
@@ -808,134 +697,25 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
   }
 
   /**
-   * Send a request to a remote server (should be already connected)
+   * Send a request to a remote server honoring the flow control mechanism
+   * (should be already connected)
    *
    * @param destTaskId Destination task id
    * @param request Request to send
    */
-  public void sendWritableRequest(int destTaskId,
-      WritableRequest request) {
-    ConcurrentMap<ClientRequestId, RequestInfo> requestMap;
-    if (limitOpenRequestsPerWorker) {
-      requestMap = reserveSpotForRequest(destTaskId, request);
-      if (requestMap == null) {
-        return;
-      }
-    } else {
-      requestMap = clientRequestIdRequestInfoMap;
-    }
-
-    doSend(destTaskId, request, requestMap);
-
-    if (limitNumberOfOpenRequests &&
-        requestMap.size() > maxNumberOfOpenRequests) {
-      long startTime = System.currentTimeMillis();
-      waitSomeRequests(numberOfRequestsToProceed);
-      timeWaitingOnOpenRequests.inc(System.currentTimeMillis() - startTime);
-    } else if (limitOpenRequestsPerWorker) {
-      aggregateOpenRequests.getAndIncrement();
-    }
-  }
-
-  /**
-   * Try to reserve a spot in the open requests map for a specific worker. If
-   * no spot is available in the open requests map, try to reserve a spot in 
the
-   * unsent requests cache. If there is no spot there too, wait until a spot
-   * becomes available either in the open requests map or unsent requests 
cache.
-   * This method is used when credit-based flow-control is enabled.
-   *
-   * @param destTaskId Destination task id
-   * @param request Request to send
-   * @return Open request map of the given task, or null if there was no spot 
in
-   *         the open request map, but there was a spot in unsent requests 
cache
-   */
-  private ConcurrentMap<ClientRequestId, RequestInfo>
-  reserveSpotForRequest(int destTaskId, WritableRequest request) {
-    Pair<AdjustableSemaphore, ConcurrentMap<ClientRequestId, RequestInfo>>
-        pair = perWorkerOpenRequestMap.get(destTaskId);
-    ConcurrentMap<ClientRequestId, RequestInfo> requestMap = pair.getRight();
-    final AdjustableSemaphore openRequestPermit = pair.getLeft();
-    // Try to reserve a spot for the request amongst the open requests of
-    // the destination worker.
-    boolean shouldSend = openRequestPermit.tryAcquire();
-    boolean shouldCache = false;
-    while (!shouldSend) {
-      // We should not send the request, and should cache the request instead.
-      // It may be possible that the unsent message cache is also full, so we
-      // should try to acquire a space on the cache, and if there is no extra
-      // space in unsent request cache, we should wait until some space
-      // become available. However, it is possible that during the time we are
-      // waiting on the unsent messages cache, actual buffer for open requests
-      // frees up space.
-      try {
-        shouldCache = unsentRequestPermit.tryAcquire(unsentWaitMsecs,
-            TimeUnit.MILLISECONDS);
-      } catch (InterruptedException e) {
-        throw new IllegalStateException("sendWritableRequest: failed " +
-            "while waiting on the unsent request cache to have some more " +
-            "room for extra unsent requests!");
-      }
-      if (shouldCache) {
-        break;
-      }
-      // We may have an open spot in the meantime that we were waiting on the
-      // unsent requests.
-      shouldSend = openRequestPermit.tryAcquire();
-      if (shouldSend) {
-        break;
-      }
-      // The current thread will be at this point only if it could not make
-      // space amongst open requests for the destination worker and has been
-      // timed-out in trying to acquire a space amongst unsent messages. So,
-      // we should report logs, report progress, and check for request
-      // failures.
-      logInfoAboutOpenRequests(-1);
-      context.progress();
-      checkRequestsForProblems();
-    }
-    // Either shouldSend == true or shouldCache == true
-    if (shouldCache) {
-      Deque<WritableRequest> unsentRequests =
-          perWorkerUnsentRequestMap.get(destTaskId);
-      // This synchronize block is necessary for the following reason:
-      // Once we are at this point, it means there was no room for this
-      // request to become an open request, hence we have to put it into
-      // unsent cache. Consider the case that since last time we checked if
-      // there is any room for an additional open request so far, all open
-      // requests are delivered and their acknowledgements are also processed.
-      // Now, if we put this request in the unsent cache, it is not being
-      // considered to become an open request, as the only one who checks
-      // on this matter would be the one who receives an acknowledgment for an
-      // open request for the destination worker. So, a lock is necessary
-      // to forcefully serialize the execution if this scenario is about to
-      // happen.
-      synchronized (unsentRequests) {
-        shouldSend = openRequestPermit.tryAcquire();
-        if (!shouldSend) {
-          aggregateUnsentRequests.getAndIncrement();
-          unsentRequests.add(request);
-          return null;
-        }
-      }
-    }
-    return requestMap;
+  public void sendWritableRequest(int destTaskId, WritableRequest request) {
+    flowControl.sendRequest(destTaskId, request);
   }
 
   /**
-   * Sends a request.
+   * Actual send of a request.
    *
    * @param destTaskId destination to send the request to
    * @param request request itself
-   * @param requestMap the map that should hold the request, used for tracking
-   *                   the request later
    */
-  private void doSend(int destTaskId, WritableRequest request,
-      ConcurrentMap<ClientRequestId, RequestInfo> requestMap) {
+  public void doSend(int destTaskId, WritableRequest request) {
     InetSocketAddress remoteServer = taskIdAddressMap.get(destTaskId);
-    if ((limitOpenRequestsPerWorker &&
-        aggregateOpenRequests.get() == 0) ||
-        (!limitOpenRequestsPerWorker &&
-            requestMap.isEmpty())) {
+    if (clientRequestIdRequestInfoMap.isEmpty()) {
       inboundByteCounter.resetAll();
       outboundByteCounter.resetAll();
     }
@@ -954,7 +734,7 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
         addressRequestIdGenerator.getNextRequestId(remoteServer));
       ClientRequestId clientRequestId =
         new ClientRequestId(destTaskId, request.getRequestId());
-      RequestInfo oldRequestInfo = requestMap.putIfAbsent(
+      RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
         clientRequestId, newRequestInfo);
       if (oldRequestInfo != null) {
         throw new IllegalStateException("sendWritableRequest: Impossible to " +
@@ -974,55 +754,6 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
   }
 
   /**
-   * Get the response flag from a response
-   *
-   * @param response response received
-   * @return AckSignalFlag coming with the response
-   */
-  public static AckSignalFlag getAckSignalFlag(short response) {
-    return AckSignalFlag.values()[(response >> 15) & 1];
-  }
-
-  /**
-   * Whether response specifies that credit should be ignored
-   *
-   * @param response response received
-   * @return true iff credit should be ignored, false otherwise
-   */
-  public static boolean shouldIgnoreCredit(short response) {
-    return ((short) ((response >> 14) & 1)) == 1;
-  }
-
-  /**
-   * Get the credit from a response
-   *
-   * @param response response received
-   * @return credit from the received response
-   */
-  public static short getCredit(short response) {
-    return (short) (response & 0x3FFF);
-  }
-
-  /**
-   * Calculate/Build a response for given flag and credit
-   *
-   * @param flag AckSignalFlag that should be sent with the response
-   * @param ignoreCredit whether this response specified to ignore the credit
-   * @param credit if credit is not ignored, what should be the max credit
-   * @return the response to be sent out along with the acknowledgement
-   */
-  public static short calculateResponse(AckSignalFlag flag,
-                                        boolean ignoreCredit, short credit) {
-    if (credit > 0x3FFF || credit < 1) {
-      LOG.warn("calculateResponse: value of the credit is " + credit +
-          " while it should be in range [1, " + 0x3FFF + "]");
-      credit = (short) Math.max((short) Math.min(credit, 0x3FFF), 1);
-    }
-    return (short) ((flag.ordinal() << 15) |
-        ((ignoreCredit ? 1 : 0) << 14) | (credit & 0x3FFF));
-  }
-
-  /**
    * Handle receipt of a message. Called by response handler.
    *
    * @param senderId Id of sender of the message
@@ -1033,16 +764,12 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
   public void messageReceived(int senderId, long requestId, short response,
       boolean shouldDrop) {
     if (shouldDrop) {
-      if (!limitOpenRequestsPerWorker) {
-        synchronized (clientRequestIdRequestInfoMap) {
-          clientRequestIdRequestInfoMap.notifyAll();
-        }
+      synchronized (clientRequestIdRequestInfoMap) {
+        clientRequestIdRequestInfoMap.notifyAll();
       }
       return;
     }
-    boolean shouldIgnoreCredit = shouldIgnoreCredit(response);
-    short credit = getCredit(response);
-    AckSignalFlag responseFlag = getAckSignalFlag(response);
+    AckSignalFlag responseFlag = flowControl.getAckSignalFlag(response);
     if (responseFlag == AckSignalFlag.DUPLICATE_REQUEST) {
       LOG.info("messageReceived: Already completed request (taskId = " +
           senderId + ", requestId = " + requestId + ")");
@@ -1050,34 +777,19 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
       throw new IllegalStateException(
           "messageReceived: Got illegal response " + response);
     }
-    RequestInfo requestInfo;
-    int numOpenRequests;
-    if (limitOpenRequestsPerWorker) {
-      requestInfo =
-          processResponse(senderId, requestId, shouldIgnoreCredit, credit);
-      numOpenRequests = aggregateOpenRequests.get();
-      if (numOpenRequests == 0) {
-        synchronized (aggregateOpenRequests) {
-          aggregateOpenRequests.notifyAll();
-        }
-      }
-    } else {
-      requestInfo = clientRequestIdRequestInfoMap
-          .remove(new ClientRequestId(senderId, requestId));
-      numOpenRequests = clientRequestIdRequestInfoMap.size();
-    }
+    RequestInfo requestInfo = clientRequestIdRequestInfoMap
+        .remove(new ClientRequestId(senderId, requestId));
     if (requestInfo == null) {
       LOG.info("messageReceived: Already received response for (taskId = " +
           senderId + ", requestId = " + requestId + ")");
     } else {
       if (LOG.isDebugEnabled()) {
         LOG.debug("messageReceived: Completed (taskId = " + senderId + ")" +
-            requestInfo + ".  Waiting on " + numOpenRequests + " requests");
+            requestInfo + ".  Waiting on " +
+            clientRequestIdRequestInfoMap.size() + " requests");
       }
-    }
-
-    if (!limitOpenRequestsPerWorker) {
-      // Help waitSomeRequests() to finish faster
+      flowControl.messageAckReceived(senderId, response);
+      // Help #waitAllRequests() to finish faster
       synchronized (clientRequestIdRequestInfoMap) {
         clientRequestIdRequestInfoMap.notifyAll();
       }
@@ -1085,77 +797,27 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
   }
 
   /**
-   * Process a response for credit based flow-control. Open up a spot in the
-   * open request map of the sender, and send from unsent requests cache as 
much
-   * as there is open space in the open request map.
-   *
-   * @param senderId the sender from whom we got the response
-   * @param requestId the id of the request the response belongs to
-   * @param ignoreCredit whether credit based mechanism should take action
-   * @param credit what should be the credit if credit based mechanism is
-   *               enabled
-   * @return Info of the request the response belongs to
+   * Ensure all the request sent so far are complete. Periodically check the
+   * state of current open requests. If there is an issue in any of them,
+   * re-send the request.
    */
-  private RequestInfo processResponse(int senderId, long requestId,
-                                      boolean ignoreCredit, short credit) {
-    Pair<AdjustableSemaphore, ConcurrentMap<ClientRequestId, RequestInfo>>
-        pair = perWorkerOpenRequestMap.get(senderId);
-    ConcurrentMap<ClientRequestId, RequestInfo> requestMap = pair.getRight();
-    RequestInfo requestInfo =
-        requestMap.remove(new ClientRequestId(senderId, requestId));
-    AdjustableSemaphore openRequestPermit = pair.getLeft();
-    if (requestInfo != null) {
-      openRequestPermit.release();
-      aggregateOpenRequests.getAndDecrement();
-      if (!ignoreCredit) {
-        openRequestPermit.setMaxPermits(credit);
-      }
-      Deque<WritableRequest> requestDeque =
-          perWorkerUnsentRequestMap.get(senderId);
-      // Since we received a response and we changed the credit of the sender
-      // client, we may be able to send some more requests to the sender
-      // client. So, we try to send as much request as we can to the sender
-      // client.
-      while (true) {
-        WritableRequest request;
-        synchronized (requestDeque) {
-          request = requestDeque.pollFirst();
-          if (request == null) {
-            break;
-          }
-          // See whether the sender client has any unused credit
-          if (!openRequestPermit.tryAcquire()) {
-            requestDeque.offerFirst(request);
-            break;
-          }
+  public void waitAllRequests() {
+    flowControl.waitAllRequests();
+    checkState(flowControl.getNumberOfUnsentRequests() == 0);
+    while (clientRequestIdRequestInfoMap.size() > 0) {
+      // Wait for requests to complete for some time
+      synchronized (clientRequestIdRequestInfoMap) {
+        if (clientRequestIdRequestInfoMap.size() == 0) {
+          break;
         }
-        // At this point, we have a request, and we reserved a credit for the
-        // sender client. So, we send the request to the client and update
-        // the state.
-        doSend(senderId, request, requestMap);
-        if (aggregateUnsentRequests.decrementAndGet() == 0) {
-          synchronized (aggregateUnsentRequests) {
-            aggregateUnsentRequests.notifyAll();
-          }
+        try {
+          clientRequestIdRequestInfoMap.wait(waitingRequestMsecs);
+        } catch (InterruptedException e) {
+          throw new IllegalStateException("waitAllRequests: Got unexpected " +
+              "InterruptedException", e);
         }
-        aggregateOpenRequests.getAndIncrement();
-        unsentRequestPermit.release();
       }
-    }
-    return requestInfo;
-  }
-
-  /**
-   * Ensure all the request sent so far are complete.
-   *
-   * @throws InterruptedException
-   */
-  public void waitAllRequests() {
-    if (limitOpenRequestsPerWorker) {
-      waitSomeRequests(aggregateUnsentRequests);
-      waitSomeRequests(aggregateOpenRequests);
-    } else {
-      waitSomeRequests(0);
+      logAndSanityCheck();
     }
     if (LOG.isInfoEnabled()) {
       LOG.info("waitAllRequests: Finished all requests. " +
@@ -1165,119 +827,43 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
   }
 
   /**
-   * Wait for some requests to complete. The aggregate number of such requests
-   * is given. Periodically check the state of current open requests. If there
-   * is an issue in any of them, re-send the request.
-   *
-   * @param aggregateRequests object keeping aggregate number of requests to
-   *                          wait for
+   * Log information about the requests and check for problems in requests
    */
-  private void waitSomeRequests(final AtomicInteger aggregateRequests) {
-    while (true) {
-      synchronized (aggregateRequests) {
-        if (aggregateRequests.get() == 0) {
-          break;
-        }
-        try {
-          aggregateRequests.wait(waitingRequestMsecs);
-        } catch (InterruptedException e) {
-          throw new IllegalStateException("waitSomeRequests: failed while " +
-              "waiting on open/cached requests");
-        }
-        if (aggregateRequests.get() == 0) {
-          break;
-        }
-      }
-      logInfoAboutOpenRequests(-1);
-      context.progress();
-      checkRequestsForProblems();
-    }
-  }
-
-  /**
-   * Ensure that at most maxOpenRequests are not complete.  Periodically,
-   * check the state of every request.  If we find the connection failed,
-   * re-establish it and re-send the request.
-   *
-   * @param maxOpenRequests Maximum number of requests which can be not
-   *                        complete
-   */
-  private void waitSomeRequests(int maxOpenRequests) {
-    while (clientRequestIdRequestInfoMap.size() > maxOpenRequests) {
-      // Wait for requests to complete for some time
-      logInfoAboutOpenRequests(maxOpenRequests);
-      synchronized (clientRequestIdRequestInfoMap) {
-        if (clientRequestIdRequestInfoMap.size() <= maxOpenRequests) {
-          break;
-        }
-        try {
-          clientRequestIdRequestInfoMap.wait(waitingRequestMsecs);
-        } catch (InterruptedException e) {
-          LOG.error("waitSomeRequests: Got unexpected InterruptedException", 
e);
-        }
-      }
-      // Make sure that waiting doesn't kill the job
-      context.progress();
-
-      checkRequestsForProblems();
-    }
+  public void logAndSanityCheck() {
+    logInfoAboutOpenRequests();
+    // Make sure that waiting doesn't kill the job
+    context.progress();
+    checkRequestsForProblems();
   }
 
   /**
    * Log the status of open requests.
-   *
-   * @param maxOpenRequests Maximum number of requests which can be not 
complete
    */
-  private void logInfoAboutOpenRequests(int maxOpenRequests) {
-    int numOpenRequests = limitOpenRequestsPerWorker ?
-        aggregateOpenRequests.get() :
-        clientRequestIdRequestInfoMap.size();
+  private void logInfoAboutOpenRequests() {
     if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {
       LOG.info("logInfoAboutOpenRequests: Waiting interval of " +
-          waitingRequestMsecs + " msecs, " + numOpenRequests +
-          " open requests, " + (limitOpenRequestsPerWorker ?
-          (aggregateUnsentRequests.get() + " unsent requests, ") :
-          ("waiting for it to be <= " + maxOpenRequests + ", ")) +
-          inboundByteCounter.getMetrics() + "\n" +
+          waitingRequestMsecs + " msecs, " +
+          clientRequestIdRequestInfoMap.size() +
+          " open requests, " + flowControl.getNumberOfUnsentRequests() +
+          " cached unsent requests, " + inboundByteCounter.getMetrics() + "\n" 
+
           outboundByteCounter.getMetrics());
 
-      if (numOpenRequests < MAX_REQUESTS_TO_LIST) {
-        if (limitOpenRequestsPerWorker) {
-          for (Pair<AdjustableSemaphore,
-               ConcurrentMap<ClientRequestId, RequestInfo>>
-               pair : perWorkerOpenRequestMap.values()) {
-            for (Map.Entry<ClientRequestId, RequestInfo> request :
-                pair.getRight().entrySet()) {
-              LOG.info("logInfoAboutOpenRequests: Waiting for request " +
-                  request.getKey() + " - " + request.getValue());
-            }
-          }
-        } else {
-          for (Map.Entry<ClientRequestId, RequestInfo> entry :
-              clientRequestIdRequestInfoMap.entrySet()) {
-            LOG.info("logInfoAboutOpenRequests: Waiting for request " +
-                entry.getKey() + " - " + entry.getValue());
-          }
+      if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) {
+        for (Map.Entry<ClientRequestId, RequestInfo> entry :
+            clientRequestIdRequestInfoMap.entrySet()) {
+          LOG.info("logInfoAboutOpenRequests: Waiting for request " +
+              entry.getKey() + " - " + entry.getValue());
         }
       }
 
       // Count how many open requests each task has
       Map<Integer, Integer> openRequestCounts = Maps.newHashMap();
-      if (limitOpenRequestsPerWorker) {
-        for (Map.Entry<Integer, Pair<AdjustableSemaphore,
-             ConcurrentMap<ClientRequestId, RequestInfo>>>
-             entry : perWorkerOpenRequestMap.entrySet()) {
-          openRequestCounts
-              .put(entry.getKey(), entry.getValue().getRight().size());
-        }
-      } else {
-        for (ClientRequestId clientRequestId : clientRequestIdRequestInfoMap
-            .keySet()) {
-          int taskId = clientRequestId.getDestinationTaskId();
-          Integer currentCount = openRequestCounts.get(taskId);
-          openRequestCounts
-              .put(taskId, (currentCount == null ? 0 : currentCount) + 1);
-        }
+      for (ClientRequestId clientRequestId :
+          clientRequestIdRequestInfoMap.keySet()) {
+        int taskId = clientRequestId.getDestinationTaskId();
+        Integer currentCount = openRequestCounts.get(taskId);
+        openRequestCounts.put(taskId,
+            (currentCount == null ? 0 : currentCount) + 1);
       }
       // Sort it in decreasing order of number of open requests
       List<Map.Entry<Integer, Integer>> sorted =
@@ -1321,29 +907,13 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
         System.currentTimeMillis())) {
       return;
     }
-    if (limitOpenRequestsPerWorker) {
-      for (Pair<AdjustableSemaphore, ConcurrentMap<ClientRequestId,
-           RequestInfo>> pair : perWorkerOpenRequestMap.values()) {
-        checkRequestsForProblemsInMap(pair.getRight());
-      }
-    } else {
-      checkRequestsForProblemsInMap(clientRequestIdRequestInfoMap);
-    }
-  }
-
-  /**
-   * Check if there are open requests amongst particular set of requests, which
-   * have been sent a long time ago, and if so, resend them.
-   *
-   * @param requestMap The map of requests we want to check for their problem
-   */
-  private void checkRequestsForProblemsInMap(
-      ConcurrentMap<ClientRequestId, RequestInfo> requestMap) {
+    // Check if there are open requests which have been sent a long time ago,
+    // and if so, resend them.
     List<ClientRequestId> addedRequestIds = Lists.newArrayList();
     List<RequestInfo> addedRequestInfos = Lists.newArrayList();
     // Check all the requests for problems
     for (Map.Entry<ClientRequestId, RequestInfo> entry :
-        requestMap.entrySet()) {
+        clientRequestIdRequestInfoMap.entrySet()) {
       RequestInfo requestInfo = entry.getValue();
       ChannelFuture writeFuture = requestInfo.getWriteFuture();
       // Request wasn't sent yet
@@ -1375,10 +945,10 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
       ClientRequestId requestId = addedRequestIds.get(i);
       RequestInfo requestInfo = addedRequestInfos.get(i);
 
-      if (requestMap.put(requestId, requestInfo) == null) {
+      if (clientRequestIdRequestInfoMap.put(requestId, requestInfo) == null) {
         LOG.warn("checkRequestsForProblems: Request " + requestId +
             " completed prior to sending the next request");
-        requestMap.remove(requestId);
+        clientRequestIdRequestInfoMap.remove(requestId);
       }
       InetSocketAddress remoteServer = requestInfo.getDestinationAddress();
       Channel channel = getNextChannel(remoteServer);
@@ -1429,6 +999,17 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
     return address;
   }
 
+  public FlowControl getFlowControl() {
+    return flowControl;
+  }
+
+  /**
+   * @return number of open requests
+   */
+  public int getNumberOfOpenRequests() {
+    return clientRequestIdRequestInfoMap.size();
+  }
+
   /**
    * This listener class just dumps exception stack traces if
    * something happens.
@@ -1443,48 +1024,4 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
       }
     }
   }
-
-  /**
-   * @return Maximum number of open requests for each worker (user-defined
-   *         value)
-   */
-  public short getMaxOpenRequestsPerWorker() {
-    return (short) maxOpenRequestsPerWorker;
-  }
-
-  /**
-   * Implementation of a sempahore where number of available permits can change
-   */
-  private static final class AdjustableSemaphore extends Semaphore {
-    /** Maximum number of available permits */
-    private int maxPermits;
-
-    /**
-     * Constructor
-     * @param permits initial number of available permits
-     */
-    public AdjustableSemaphore(int permits) {
-      super(permits);
-      maxPermits = permits;
-    }
-
-    /**
-     * Adjusts the maximum number of available permits.
-     *
-     * @param newMax max number of permits
-     */
-    public synchronized void setMaxPermits(int newMax) {
-      checkState(newMax >= 0, "setMaxPermits: number of permits cannot be " +
-          "less than 0");
-      int delta = newMax - this.maxPermits;
-      if (delta > 0) {
-        // Releasing semaphore to make room for 'delta' more permits
-        release(delta);
-      } else if (delta < 0) {
-        // Reducing number of permits in the semaphore
-        reducePermits(-delta);
-      }
-      this.maxPermits = newMax;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
index 9b348e8..1182b38 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
@@ -25,6 +25,7 @@ import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.comm.MasterClient;
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.comm.aggregators.SendGlobalCommCache;
+import org.apache.giraph.comm.flow_control.FlowControl;
 import org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -126,4 +127,9 @@ public class NettyMasterClient implements MasterClient {
   public void closeConnections() {
     nettyClient.stop();
   }
+
+  @Override
+  public FlowControl getFlowControl() {
+    return nettyClient.getFlowControl();
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
index 37f4f04..26008a6 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.comm.MasterServer;
+import org.apache.giraph.comm.flow_control.FlowControl;
 import org.apache.giraph.comm.netty.handler.MasterRequestServerHandler;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.util.Progressable;
@@ -65,4 +66,9 @@ public class NettyMasterServer implements MasterServer {
   public void close() {
     nettyServer.stop();
   }
+
+  @Override
+  public void setFlowControl(FlowControl flowControl) {
+    nettyServer.setFlowControl(flowControl);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
index 28923b8..a461bdd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.comm.netty;
 
+import org.apache.giraph.comm.flow_control.FlowControl;
 /*if_not[HADOOP_NON_SECURE]*/
 import org.apache.giraph.comm.netty.handler.AuthorizeServerHandler;
 /*end[HADOOP_NON_SECURE]*/
@@ -60,6 +61,7 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 
+import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.giraph.conf.GiraphConstants.MAX_IPC_PORT_BIND_ATTEMPTS;
 
 /**
@@ -418,5 +420,14 @@ public class NettyServer {
     return localHostOrIp;
   }
 
+  /**
+   * Inform the server about the flow control policy used in sending requests
+   *
+   * @param flowControl reference to the flow control used
+   */
+  public void setFlowControl(FlowControl flowControl) {
+    checkState(requestServerHandlerFactory != null);
+    requestServerHandlerFactory.setFlowControl(flowControl);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
index 065935d..1912c8c 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
@@ -20,6 +20,7 @@ package org.apache.giraph.comm.netty;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.comm.flow_control.FlowControl;
 import org.apache.giraph.comm.requests.RequestType;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -181,11 +182,8 @@ else[HADOOP_NON_SECURE]*/
 
 /*end[HADOOP_NON_SECURE]*/
 
-  /**
-   * @return Maximum number of open requests for each worker (user-defined
-   *         value)
-   */
-  public short getMaxOpenRequestsPerWorker() {
-    return nettyClient.getMaxOpenRequestsPerWorker();
+  @Override
+  public FlowControl getFlowControl() {
+    return nettyClient.getFlowControl();
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index c10d49d..befce5f 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -21,6 +21,7 @@ package org.apache.giraph.comm.netty;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerServer;
+import org.apache.giraph.comm.flow_control.FlowControl;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;
@@ -105,4 +106,9 @@ public class NettyWorkerServer<I extends WritableComparable,
   public void close() {
     nettyServer.stop();
   }
+
+  @Override
+  public void setFlowControl(FlowControl flowControl) {
+    nettyServer.setFlowControl(flowControl);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
index 96b096f..0dba38e 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.comm.netty.handler;
 
+import org.apache.giraph.comm.flow_control.FlowControl;
 import org.apache.giraph.comm.requests.MasterRequest;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.TaskInfo;
@@ -37,25 +38,18 @@ public class MasterRequestServerHandler extends
    * @param myTaskInfo               Current task info
    * @param commHandler              Master communication handler
    * @param exceptionHandler         Handles uncaught exceptions
+   * @param flowControl              Reference to the flow control used
    */
   public MasterRequestServerHandler(
       WorkerRequestReservedMap workerRequestReservedMap,
       ImmutableClassesGiraphConfiguration conf,
       TaskInfo myTaskInfo,
       MasterGlobalCommHandler commHandler,
-      Thread.UncaughtExceptionHandler exceptionHandler) {
+      Thread.UncaughtExceptionHandler exceptionHandler,
+      FlowControl flowControl) {
     super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
     this.commHandler = commHandler;
-  }
-
-  @Override
-  protected short getCurrentMaxCredit() {
-    return 0;
-  }
-
-  @Override
-  protected boolean shouldIgnoreCredit(int taskId) {
-    return true;
+    this.flowControl = flowControl;
   }
 
   @Override
@@ -69,6 +63,8 @@ public class MasterRequestServerHandler extends
   public static class Factory implements RequestServerHandler.Factory {
     /** Master aggregator handler */
     private final MasterGlobalCommHandler commHandler;
+    /** Flow control used in sending requests */
+    private FlowControl flowControl;
 
     /**
      * Constructor
@@ -86,7 +82,12 @@ public class MasterRequestServerHandler extends
         TaskInfo myTaskInfo,
         Thread.UncaughtExceptionHandler exceptionHandler) {
       return new MasterRequestServerHandler(workerRequestReservedMap, conf,
-          myTaskInfo, commHandler, exceptionHandler);
+          myTaskInfo, commHandler, exceptionHandler, flowControl);
+    }
+
+    @Override
+    public void setFlowControl(FlowControl flowControl) {
+      this.flowControl = flowControl;
     }
   }
 }

Reply via email to