Repository: giraph
Updated Branches:
  refs/heads/trunk 16dba64c3 -> 97e26e65f


Improve flow control on sender side (pre-requisite for credit-based flow 
control)

Summary: Currently, a sender worker will keep all open requests (and optionally 
up to a certain number of total open requests) in its own memory. This behavior 
may cause high memory usage in sender side. Also, since messages can arrive to 
a worker at an arbitrary rate, receiver may not have the ability to handle all 
incoming messages, hence we may see a large memory footprint in receiver as 
well. This diff addresses the problem by limiting the number of open requests 
per worker in sender side. Also, it provides a cache of unsent requests on 
sender in case the sender already sent enough messages to another worker but 
has not received any response back.

Test Plan: mvn clean verify

Reviewers: avery.ching, sergey.edunov, maja.kabiljo, dionysis.logothetis

Reviewed By: dionysis.logothetis

Subscribers: Alessio

Differential Revision: https://reviews.facebook.net/D43797


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/97e26e65
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/97e26e65
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/97e26e65

Branch: refs/heads/trunk
Commit: 97e26e65f0ca669926c9c8ddc05f19da76180ab5
Parents: 16dba64
Author: Hassan Eslami <[email protected]>
Authored: Mon Apr 4 16:29:36 2016 -0700
Committer: Maja Kabiljo <[email protected]>
Committed: Mon Apr 4 16:31:31 2016 -0700

----------------------------------------------------------------------
 .../apache/giraph/comm/netty/NettyClient.java   | 588 +++++++++++++++++--
 .../giraph/comm/netty/NettyWorkerClient.java    |   8 +
 .../comm/netty/handler/AckSignalFlag.java       |  29 +
 .../handler/MasterRequestServerHandler.java     |  10 +
 .../netty/handler/RequestServerHandler.java     |  39 +-
 .../netty/handler/ResponseClientHandler.java    |  60 +-
 .../handler/WorkerRequestServerHandler.java     |  12 +
 .../org/apache/giraph/comm/RequestTest.java     |  11 +
 8 files changed, 668 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/97e26e65/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 8ea11a5..15f0502 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -18,6 +18,9 @@
 
 package org.apache.giraph.comm.netty;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.giraph.comm.netty.handler.AckSignalFlag;
 import org.apache.giraph.comm.netty.handler.AddressRequestIdGenerator;
 import org.apache.giraph.comm.netty.handler.ClientRequestId;
 import org.apache.giraph.comm.netty.handler.RequestEncoder;
@@ -56,12 +59,16 @@ import com.yammer.metrics.core.Counter;
 import java.io.IOException;
 /*end[HADOOP_NON_SECURE]*/
 import java.net.InetSocketAddress;
+import java.util.ArrayDeque;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Deque;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -83,6 +90,7 @@ import io.netty.util.AttributeKey;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import io.netty.util.concurrent.EventExecutorGroup;
 
+import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.giraph.conf.GiraphConstants.CLIENT_RECEIVE_BUFFER_SIZE;
 import static org.apache.giraph.conf.GiraphConstants.CLIENT_SEND_BUFFER_SIZE;
 import static org.apache.giraph.conf.GiraphConstants.MAX_REQUEST_MILLISECONDS;
@@ -92,7 +100,6 @@ import static 
org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_THRE
 import static 
org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER;
 import static 
org.apache.giraph.conf.GiraphConstants.NETTY_MAX_CONNECTION_FAILURES;
 import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
-
 /**
  * Netty client for sending requests.  Thread-safe.
  */
@@ -105,6 +112,38 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
   public static final IntConfOption MAX_NUMBER_OF_OPEN_REQUESTS =
       new IntConfOption("giraph.maxNumberOfOpenRequests", 10000,
           "Maximum number of requests without confirmation we should have");
+
+  /**
+   * Do we have a limit on number of open requests we can have for each worker.
+   * Note that if this option is enabled, Netty will not keep more than a
+   * certain number of requests open for each other worker in the job. If there
+   * are more requests generated for a worker, Netty will not actually send the
+   * surplus requests, instead, it caches the requests in a local buffer. The
+   * maximum number of these unsent requests in the cache is another
+   * user-defined parameter (MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER).
+   */
+  public static final BooleanConfOption LIMIT_OPEN_REQUESTS_PER_WORKER =
+      new BooleanConfOption("giraph.waitForPerWorkerRequests", false,
+          "Whether to have a limit on number of open requests for each worker" 
+
+              "or not");
+  /**
+   * Maximum number of requests we can have per worker without confirmation
+   * (i.e. open requests)
+   */
+  public static final IntConfOption MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER =
+      new IntConfOption("giraph.maxOpenRequestsPerWorker", 20,
+          "Maximum number of requests without confirmation we can have per " +
+              "worker");
+  /** Aggregate number of in-memory unsent requests */
+  public static final IntConfOption MAX_NUM_OF_UNSENT_REQUESTS =
+      new IntConfOption("giraph.maxNumberOfUnsentRequests", 2000,
+          "Maximum number of unsent requests we can keep in memory");
+  /**
+   * Time interval to wait on unsent requests cahce until we find a spot in it
+   */
+  public static final IntConfOption UNSENT_CACHE_WAIT_INTERVAL =
+      new IntConfOption("giraph.unsentCacheWaitInterval", 1000,
+          "Time interval to wait on unsent requests cache (in milliseconds)");
   /**
    * After pausing a thread due to too large number of open requests,
    * which fraction of these requests need to be closed before we continue
@@ -112,7 +151,7 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
   public static final FloatConfOption
   FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING =
       new FloatConfOption("giraph.fractionOfRequestsToCloseBeforeProceeding",
-          0.2f, "Fraction of requsts to close before proceeding");
+          0.2f, "Fraction of requests to close before proceeding");
   /** Maximum number of requests to list (for debugging) */
   public static final int MAX_REQUESTS_TO_LIST = 10;
   /**
@@ -167,16 +206,49 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
   private final float requestSizeWarningThreshold;
   /** Maximum number of requests without confirmation we can have */
   private final int maxNumberOfOpenRequests;
+  /** Wait interval on unsent requests cache until it frees up */
+  private final int unsentWaitMsecs;
   /**
    * Maximum number of requests that can be open after the pause in order to
    * proceed
    */
   private final int numberOfRequestsToProceed;
+  /** Do we have a limit on the number of open requests per worker */
+  private final boolean limitOpenRequestsPerWorker;
+  /** Maximum number of open requests we can have for each worker */
+  private final int maxOpenRequestsPerWorker;
+  /** Total number of open requests */
+  private final AtomicInteger aggregateOpenRequests;
+  /** Total number of unsent, cached requests */
+  private final AtomicInteger aggregateUnsentRequests;
+  /**
+   * Map of requests per worker. Key in the map is the worker id and the value
+   * in the map is a pair (X, Y) where:
+   *   Y = map of client request ids to request information for a particular
+   *       worker,
+   *   X = the semaphore to control the number of open requests for the
+   *       the particular worker. Basically, the number of available permits
+   *       on this semaphore is the credit available for the worker (in credit-
+   *       based control-flow mechanism), and the number of permits already
+   *       taken from the semaphore should be equal to the size of Y (unless a
+   *       transition is happening).
+   */
+  private final ConcurrentMap<Integer,
+      Pair<AdjustableSemaphore, ConcurrentMap<ClientRequestId, RequestInfo>>>
+      perWorkerOpenRequestMap;
+  /** Map of unsent, cached requests per worker */
+  private final ConcurrentMap<Integer, Deque<WritableRequest>>
+      perWorkerUnsentRequestMap;
+  /**
+   * Semaphore to control number of cached unsent request. Maximum number of
+   * permits of this semaphore should be equal to MAX_NUM_OF_UNSENT_REQUESTS.
+   */
+  private final Semaphore unsentRequestPermit;
   /** Maximum number of connection failures */
   private final int maxConnectionFailures;
   /** Maximum number of milliseconds for a request */
   private final int maxRequestMilliseconds;
-  /** Waiting internal for checking outstanding requests msecs */
+  /** Waiting interval for checking outstanding requests msecs */
   private final int waitingRequestMsecs;
   /** Timed logger for printing request debugging */
   private final TimedLogger requestLogger = new TimedLogger(15 * 1000, LOG);
@@ -244,6 +316,31 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
       maxNumberOfOpenRequests = -1;
       numberOfRequestsToProceed = 0;
     }
+    limitOpenRequestsPerWorker = LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
+    /* Maximum number of unsent requests we can have in total */
+    if (limitOpenRequestsPerWorker) {
+      checkState(!limitNumberOfOpenRequests, "NettyClient: it is not allowed " 
+
+          "to have both limitation on the number of total open requests, and " 
+
+          "limitation on the number of open requests per worker!");
+      maxOpenRequestsPerWorker = MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
+      checkState(maxOpenRequestsPerWorker > 0x3FFF ||
+          maxOpenRequestsPerWorker < 1, "NettyClient: max number of open " +
+          "requests should be in range [1, " + 0x3FFF + "]");
+      aggregateOpenRequests = new AtomicInteger(0);
+      aggregateUnsentRequests = new AtomicInteger(0);
+      perWorkerOpenRequestMap = Maps.newConcurrentMap();
+      perWorkerUnsentRequestMap = Maps.newConcurrentMap();
+      unsentRequestPermit = new 
Semaphore(MAX_NUM_OF_UNSENT_REQUESTS.get(conf));
+      unsentWaitMsecs = UNSENT_CACHE_WAIT_INTERVAL.get(conf);
+    } else {
+      maxOpenRequestsPerWorker = -1;
+      aggregateOpenRequests = new AtomicInteger(-1);
+      aggregateUnsentRequests = new AtomicInteger(-1);
+      perWorkerOpenRequestMap = null;
+      perWorkerUnsentRequestMap = null;
+      unsentRequestPermit = null;
+      unsentWaitMsecs = -1;
+    }
 
     maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
     maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
@@ -340,8 +437,8 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
                   new SaslClientHandler(conf), handlerToUseExecutionGroup,
                   executionGroup, ch);
               PipelineUtils.addLastWithExecutorCheck("response-handler",
-                  new ResponseClientHandler(clientRequestIdRequestInfoMap,
-                      conf), handlerToUseExecutionGroup, executionGroup, ch);
+                  new ResponseClientHandler(NettyClient.this, conf),
+                  handlerToUseExecutionGroup, executionGroup, ch);
             } else {
               LOG.info("Using Netty without authentication.");
 /*end[HADOOP_NON_SECURE]*/
@@ -371,8 +468,8 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
                     new RequestEncoder(conf), handlerToUseExecutionGroup,
                   executionGroup, ch);
               PipelineUtils.addLastWithExecutorCheck("response-handler",
-                    new ResponseClientHandler(clientRequestIdRequestInfoMap,
-                        conf), handlerToUseExecutionGroup, executionGroup, ch);
+                  new ResponseClientHandler(NettyClient.this, conf),
+                  handlerToUseExecutionGroup, executionGroup, ch);
 
 /*if_not[HADOOP_NON_SECURE]*/
             }
@@ -429,13 +526,14 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
         Lists.newArrayListWithCapacity(tasks.size() * channelsPerServer);
     for (TaskInfo taskInfo : tasks) {
       context.progress();
-      InetSocketAddress address = taskIdAddressMap.get(taskInfo.getTaskId());
+      int taskId = taskInfo.getTaskId();
+      InetSocketAddress address = taskIdAddressMap.get(taskId);
       if (address == null ||
           !address.getHostName().equals(taskInfo.getHostname()) ||
           address.getPort() != taskInfo.getPort()) {
         address = resolveAddress(maxResolveAddressAttempts,
             taskInfo.getHostOrIp(), taskInfo.getPort());
-        taskIdAddressMap.put(taskInfo.getTaskId(), address);
+        taskIdAddressMap.put(taskId, address);
       }
       if (address == null || address.getHostName() == null ||
           address.getHostName().isEmpty()) {
@@ -447,6 +545,15 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
             "address " + address);
       }
 
+      if (limitOpenRequestsPerWorker &&
+          !perWorkerOpenRequestMap.containsKey(taskId)) {
+        perWorkerOpenRequestMap.put(taskId,
+            new ImmutablePair<>(
+                new AdjustableSemaphore(maxOpenRequestsPerWorker),
+                Maps.<ClientRequestId, RequestInfo>newConcurrentMap()));
+        perWorkerUnsentRequestMap
+            .put(taskId, new ArrayDeque<WritableRequest>());
+      }
       if (addressChannelMap.containsKey(address)) {
         continue;
       }
@@ -457,7 +564,7 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
 
         waitingConnectionList.add(
             new ChannelFutureAddress(
-                connectionFuture, address, taskInfo.getTaskId()));
+                connectionFuture, address, taskId));
       }
     }
 
@@ -708,8 +815,127 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
    */
   public void sendWritableRequest(int destTaskId,
       WritableRequest request) {
+    ConcurrentMap<ClientRequestId, RequestInfo> requestMap;
+    if (limitOpenRequestsPerWorker) {
+      requestMap = reserveSpotForRequest(destTaskId, request);
+      if (requestMap == null) {
+        return;
+      }
+    } else {
+      requestMap = clientRequestIdRequestInfoMap;
+    }
+
+    doSend(destTaskId, request, requestMap);
+
+    if (limitNumberOfOpenRequests &&
+        requestMap.size() > maxNumberOfOpenRequests) {
+      long startTime = System.currentTimeMillis();
+      waitSomeRequests(numberOfRequestsToProceed);
+      timeWaitingOnOpenRequests.inc(System.currentTimeMillis() - startTime);
+    } else if (limitOpenRequestsPerWorker) {
+      aggregateOpenRequests.getAndIncrement();
+    }
+  }
+
+  /**
+   * Try to reserve a spot in the open requests map for a specific worker. If
+   * no spot is available in the open requests map, try to reserve a spot in 
the
+   * unsent requests cache. If there is no spot there too, wait until a spot
+   * becomes available either in the open requests map or unsent requests 
cache.
+   * This method is used when credit-based flow-control is enabled.
+   *
+   * @param destTaskId Destination task id
+   * @param request Request to send
+   * @return Open request map of the given task, or null if there was no spot 
in
+   *         the open request map, but there was a spot in unsent requests 
cache
+   */
+  private ConcurrentMap<ClientRequestId, RequestInfo>
+  reserveSpotForRequest(int destTaskId, WritableRequest request) {
+    Pair<AdjustableSemaphore, ConcurrentMap<ClientRequestId, RequestInfo>>
+        pair = perWorkerOpenRequestMap.get(destTaskId);
+    ConcurrentMap<ClientRequestId, RequestInfo> requestMap = pair.getRight();
+    final AdjustableSemaphore openRequestPermit = pair.getLeft();
+    // Try to reserve a spot for the request amongst the open requests of
+    // the destination worker.
+    boolean shouldSend = openRequestPermit.tryAcquire();
+    boolean shouldCache = false;
+    while (!shouldSend) {
+      // We should not send the request, and should cache the request instead.
+      // It may be possible that the unsent message cache is also full, so we
+      // should try to acquire a space on the cache, and if there is no extra
+      // space in unsent request cache, we should wait until some space
+      // become available. However, it is possible that during the time we are
+      // waiting on the unsent messages cache, actual buffer for open requests
+      // frees up space.
+      try {
+        shouldCache = unsentRequestPermit.tryAcquire(unsentWaitMsecs,
+            TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("sendWritableRequest: failed " +
+            "while waiting on the unsent request cache to have some more " +
+            "room for extra unsent requests!");
+      }
+      if (shouldCache) {
+        break;
+      }
+      // We may have an open spot in the meantime that we were waiting on the
+      // unsent requests.
+      shouldSend = openRequestPermit.tryAcquire();
+      if (shouldSend) {
+        break;
+      }
+      // The current thread will be at this point only if it could not make
+      // space amongst open requests for the destination worker and has been
+      // timed-out in trying to acquire a space amongst unsent messages. So,
+      // we should report logs, report progress, and check for request
+      // failures.
+      logInfoAboutOpenRequests(-1);
+      context.progress();
+      checkRequestsForProblems();
+    }
+    // Either shouldSend == true or shouldCache == true
+    if (shouldCache) {
+      Deque<WritableRequest> unsentRequests =
+          perWorkerUnsentRequestMap.get(destTaskId);
+      // This synchronize block is necessary for the following reason:
+      // Once we are at this point, it means there was no room for this
+      // request to become an open request, hence we have to put it into
+      // unsent cache. Consider the case that since last time we checked if
+      // there is any room for an additional open request so far, all open
+      // requests are delivered and their acknowledgements are also processed.
+      // Now, if we put this request in the unsent cache, it is not being
+      // considered to become an open request, as the only one who checks
+      // on this matter would be the one who receives an acknowledgment for an
+      // open request for the destination worker. So, a lock is necessary
+      // to forcefully serialize the execution if this scenario is about to
+      // happen.
+      synchronized (unsentRequests) {
+        shouldSend = openRequestPermit.tryAcquire();
+        if (!shouldSend) {
+          aggregateUnsentRequests.getAndIncrement();
+          unsentRequests.add(request);
+          return null;
+        }
+      }
+    }
+    return requestMap;
+  }
+
+  /**
+   * Sends a request.
+   *
+   * @param destTaskId destination to send the request to
+   * @param request request itself
+   * @param requestMap the map that should hold the request, used for tracking
+   *                   the request later
+   */
+  private void doSend(int destTaskId, WritableRequest request,
+      ConcurrentMap<ClientRequestId, RequestInfo> requestMap) {
     InetSocketAddress remoteServer = taskIdAddressMap.get(destTaskId);
-    if (clientRequestIdRequestInfoMap.isEmpty()) {
+    if ((limitOpenRequestsPerWorker &&
+        aggregateOpenRequests.get() == 0) ||
+        (!limitOpenRequestsPerWorker &&
+            requestMap.isEmpty())) {
       inboundByteCounter.resetAll();
       outboundByteCounter.resetAll();
     }
@@ -728,7 +954,7 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
         addressRequestIdGenerator.getNextRequestId(remoteServer));
       ClientRequestId clientRequestId =
         new ClientRequestId(destTaskId, request.getRequestId());
-      RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
+      RequestInfo oldRequestInfo = requestMap.putIfAbsent(
         clientRequestId, newRequestInfo);
       if (oldRequestInfo != null) {
         throw new IllegalStateException("sendWritableRequest: Impossible to " +
@@ -737,7 +963,7 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
       }
     }
     if (request.getSerializedSize() >
-      requestSizeWarningThreshold * sendBufferSize) {
+        requestSizeWarningThreshold * sendBufferSize) {
       LOG.warn("Creating large request of type " + request.getClass() +
         ", size " + request.getSerializedSize() +
         " bytes. Check netty buffer size.");
@@ -745,13 +971,178 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
     ChannelFuture writeFuture = channel.write(request);
     newRequestInfo.setWriteFuture(writeFuture);
     writeFuture.addListener(logErrorListener);
+  }
 
-    if (limitNumberOfOpenRequests &&
-        clientRequestIdRequestInfoMap.size() > maxNumberOfOpenRequests) {
-      long startTime = System.currentTimeMillis();
-      waitSomeRequests(numberOfRequestsToProceed);
-      timeWaitingOnOpenRequests.inc(System.currentTimeMillis() - startTime);
+  /**
+   * Get the response flag from a response
+   *
+   * @param response response received
+   * @return AckSignalFlag coming with the response
+   */
+  public static AckSignalFlag getAckSignalFlag(short response) {
+    return AckSignalFlag.values()[(response >> 15) & 1];
+  }
+
+  /**
+   * Whether response specifies that credit should be ignored
+   *
+   * @param response response received
+   * @return true iff credit should be ignored, false otherwise
+   */
+  public static boolean shouldIgnoreCredit(short response) {
+    return ((short) ((response >> 14) & 1)) == 1;
+  }
+
+  /**
+   * Get the credit from a response
+   *
+   * @param response response received
+   * @return credit from the received response
+   */
+  public static short getCredit(short response) {
+    return (short) (response & 0x3FFF);
+  }
+
+  /**
+   * Calculate/Build a response for given flag and credit
+   *
+   * @param flag AckSignalFlag that should be sent with the response
+   * @param ignoreCredit whether this response specified to ignore the credit
+   * @param credit if credit is not ignored, what should be the max credit
+   * @return the response to be sent out along with the acknowledgement
+   */
+  public static short calculateResponse(AckSignalFlag flag,
+                                        boolean ignoreCredit, short credit) {
+    if (credit > 0x3FFF || credit < 1) {
+      LOG.warn("calculateResponse: value of the credit is " + credit +
+          " while it should be in range [1, " + 0x3FFF + "]");
+      credit = (short) Math.max((short) Math.min(credit, 0x3FFF), 1);
     }
+    return (short) ((flag.ordinal() << 15) |
+        ((ignoreCredit ? 1 : 0) << 14) | (credit & 0x3FFF));
+  }
+
+  /**
+   * Handle receipt of a message. Called by response handler.
+   *
+   * @param senderId Id of sender of the message
+   * @param requestId Id of the request
+   * @param response Actual response
+   * @param shouldDrop Drop the message?
+   */
+  public void messageReceived(int senderId, long requestId, short response,
+      boolean shouldDrop) {
+    if (shouldDrop) {
+      if (!limitOpenRequestsPerWorker) {
+        synchronized (clientRequestIdRequestInfoMap) {
+          clientRequestIdRequestInfoMap.notifyAll();
+        }
+      }
+      return;
+    }
+    boolean shouldIgnoreCredit = shouldIgnoreCredit(response);
+    short credit = getCredit(response);
+    AckSignalFlag responseFlag = getAckSignalFlag(response);
+    if (responseFlag == AckSignalFlag.DUPLICATE_REQUEST) {
+      LOG.info("messageReceived: Already completed request (taskId = " +
+          senderId + ", requestId = " + requestId + ")");
+    } else if (responseFlag != AckSignalFlag.NEW_REQUEST) {
+      throw new IllegalStateException(
+          "messageReceived: Got illegal response " + response);
+    }
+    RequestInfo requestInfo;
+    int numOpenRequests;
+    if (limitOpenRequestsPerWorker) {
+      requestInfo =
+          processResponse(senderId, requestId, shouldIgnoreCredit, credit);
+      numOpenRequests = aggregateOpenRequests.get();
+      if (numOpenRequests == 0) {
+        synchronized (aggregateOpenRequests) {
+          aggregateOpenRequests.notifyAll();
+        }
+      }
+    } else {
+      requestInfo = clientRequestIdRequestInfoMap
+          .remove(new ClientRequestId(senderId, requestId));
+      numOpenRequests = clientRequestIdRequestInfoMap.size();
+    }
+    if (requestInfo == null) {
+      LOG.info("messageReceived: Already received response for (taskId = " +
+          senderId + ", requestId = " + requestId + ")");
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("messageReceived: Completed (taskId = " + senderId + ")" +
+            requestInfo + ".  Waiting on " + numOpenRequests + " requests");
+      }
+    }
+
+    if (!limitOpenRequestsPerWorker) {
+      // Help waitSomeRequests() to finish faster
+      synchronized (clientRequestIdRequestInfoMap) {
+        clientRequestIdRequestInfoMap.notifyAll();
+      }
+    }
+  }
+
+  /**
+   * Process a response for credit based flow-control. Open up a spot in the
+   * open request map of the sender, and send from unsent requests cache as 
much
+   * as there is open space in the open request map.
+   *
+   * @param senderId the sender from whom we got the response
+   * @param requestId the id of the request the response belongs to
+   * @param ignoreCredit whether credit based mechanism should take action
+   * @param credit what should be the credit if credit based mechanism is
+   *               enabled
+   * @return Info of the request the response belongs to
+   */
+  private RequestInfo processResponse(int senderId, long requestId,
+                                      boolean ignoreCredit, short credit) {
+    Pair<AdjustableSemaphore, ConcurrentMap<ClientRequestId, RequestInfo>>
+        pair = perWorkerOpenRequestMap.get(senderId);
+    ConcurrentMap<ClientRequestId, RequestInfo> requestMap = pair.getRight();
+    RequestInfo requestInfo =
+        requestMap.remove(new ClientRequestId(senderId, requestId));
+    AdjustableSemaphore openRequestPermit = pair.getLeft();
+    if (requestInfo != null) {
+      openRequestPermit.release();
+      aggregateOpenRequests.getAndDecrement();
+      if (!ignoreCredit) {
+        openRequestPermit.setMaxPermits(credit);
+      }
+      Deque<WritableRequest> requestDeque =
+          perWorkerUnsentRequestMap.get(senderId);
+      // Since we received a response and we changed the credit of the sender
+      // client, we may be able to send some more requests to the sender
+      // client. So, we try to send as much request as we can to the sender
+      // client.
+      while (true) {
+        WritableRequest request;
+        synchronized (requestDeque) {
+          request = requestDeque.pollFirst();
+          if (request == null) {
+            break;
+          }
+          // See whether the sender client has any unused credit
+          if (!openRequestPermit.tryAcquire()) {
+            requestDeque.offerFirst(request);
+            break;
+          }
+        }
+        // At this point, we have a request, and we reserved a credit for the
+        // sender client. So, we send the request to the client and update
+        // the state.
+        doSend(senderId, request, requestMap);
+        if (aggregateUnsentRequests.decrementAndGet() == 0) {
+          synchronized (aggregateUnsentRequests) {
+            aggregateUnsentRequests.notifyAll();
+          }
+        }
+        aggregateOpenRequests.getAndIncrement();
+        unsentRequestPermit.release();
+      }
+    }
+    return requestInfo;
   }
 
   /**
@@ -760,7 +1151,12 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
    * @throws InterruptedException
    */
   public void waitAllRequests() {
-    waitSomeRequests(0);
+    if (limitOpenRequestsPerWorker) {
+      waitSomeRequests(aggregateUnsentRequests);
+      waitSomeRequests(aggregateOpenRequests);
+    } else {
+      waitSomeRequests(0);
+    }
     if (LOG.isInfoEnabled()) {
       LOG.info("waitAllRequests: Finished all requests. " +
           inboundByteCounter.getMetrics() + "\n" + outboundByteCounter
@@ -769,6 +1165,36 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
   }
 
   /**
+   * Wait for some requests to complete. The aggregate number of such requests
+   * is given. Periodically check the state of current open requests. If there
+   * is an issue in any of them, re-send the request.
+   *
+   * @param aggregateRequests object keeping aggregate number of requests to
+   *                          wait for
+   */
+  private void waitSomeRequests(final AtomicInteger aggregateRequests) {
+    while (true) {
+      synchronized (aggregateRequests) {
+        if (aggregateRequests.get() == 0) {
+          break;
+        }
+        try {
+          aggregateRequests.wait(waitingRequestMsecs);
+        } catch (InterruptedException e) {
+          throw new IllegalStateException("waitSomeRequests: failed while " +
+              "waiting on open/cached requests");
+        }
+        if (aggregateRequests.get() == 0) {
+          break;
+        }
+      }
+      logInfoAboutOpenRequests(-1);
+      context.progress();
+      checkRequestsForProblems();
+    }
+  }
+
+  /**
    * Ensure that at most maxOpenRequests are not complete.  Periodically,
    * check the state of every request.  If we find the connection failed,
    * re-establish it and re-send the request.
@@ -803,30 +1229,55 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
    * @param maxOpenRequests Maximum number of requests which can be not 
complete
    */
   private void logInfoAboutOpenRequests(int maxOpenRequests) {
+    int numOpenRequests = limitOpenRequestsPerWorker ?
+        aggregateOpenRequests.get() :
+        clientRequestIdRequestInfoMap.size();
     if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {
       LOG.info("logInfoAboutOpenRequests: Waiting interval of " +
-          waitingRequestMsecs + " msecs, " +
-          clientRequestIdRequestInfoMap.size() +
-          " open requests, waiting for it to be <= " + maxOpenRequests +
-          ", " + inboundByteCounter.getMetrics() + "\n" +
+          waitingRequestMsecs + " msecs, " + numOpenRequests +
+          " open requests, " + (limitOpenRequestsPerWorker ?
+          (aggregateUnsentRequests.get() + " unsent requests, ") :
+          ("waiting for it to be <= " + maxOpenRequests + ", ")) +
+          inboundByteCounter.getMetrics() + "\n" +
           outboundByteCounter.getMetrics());
 
-      if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) {
-        for (Map.Entry<ClientRequestId, RequestInfo> entry :
-            clientRequestIdRequestInfoMap.entrySet()) {
-          LOG.info("logInfoAboutOpenRequests: Waiting for request " +
-              entry.getKey() + " - " + entry.getValue());
+      if (numOpenRequests < MAX_REQUESTS_TO_LIST) {
+        if (limitOpenRequestsPerWorker) {
+          for (Pair<AdjustableSemaphore,
+               ConcurrentMap<ClientRequestId, RequestInfo>>
+               pair : perWorkerOpenRequestMap.values()) {
+            for (Map.Entry<ClientRequestId, RequestInfo> request :
+                pair.getRight().entrySet()) {
+              LOG.info("logInfoAboutOpenRequests: Waiting for request " +
+                  request.getKey() + " - " + request.getValue());
+            }
+          }
+        } else {
+          for (Map.Entry<ClientRequestId, RequestInfo> entry :
+              clientRequestIdRequestInfoMap.entrySet()) {
+            LOG.info("logInfoAboutOpenRequests: Waiting for request " +
+                entry.getKey() + " - " + entry.getValue());
+          }
         }
       }
 
       // Count how many open requests each task has
       Map<Integer, Integer> openRequestCounts = Maps.newHashMap();
-      for (ClientRequestId clientRequestId :
-          clientRequestIdRequestInfoMap.keySet()) {
-        int taskId = clientRequestId.getDestinationTaskId();
-        Integer currentCount = openRequestCounts.get(taskId);
-        openRequestCounts.put(taskId,
-            (currentCount == null ? 0 : currentCount) + 1);
+      if (limitOpenRequestsPerWorker) {
+        for (Map.Entry<Integer, Pair<AdjustableSemaphore,
+             ConcurrentMap<ClientRequestId, RequestInfo>>>
+             entry : perWorkerOpenRequestMap.entrySet()) {
+          openRequestCounts
+              .put(entry.getKey(), entry.getValue().getRight().size());
+        }
+      } else {
+        for (ClientRequestId clientRequestId : clientRequestIdRequestInfoMap
+            .keySet()) {
+          int taskId = clientRequestId.getDestinationTaskId();
+          Integer currentCount = openRequestCounts.get(taskId);
+          openRequestCounts
+              .put(taskId, (currentCount == null ? 0 : currentCount) + 1);
+        }
       }
       // Sort it in decreasing order of number of open requests
       List<Map.Entry<Integer, Integer>> sorted =
@@ -870,11 +1321,29 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
         System.currentTimeMillis())) {
       return;
     }
+    if (limitOpenRequestsPerWorker) {
+      for (Pair<AdjustableSemaphore, ConcurrentMap<ClientRequestId,
+           RequestInfo>> pair : perWorkerOpenRequestMap.values()) {
+        checkRequestsForProblemsInMap(pair.getRight());
+      }
+    } else {
+      checkRequestsForProblemsInMap(clientRequestIdRequestInfoMap);
+    }
+  }
+
+  /**
+   * Check if there are open requests amongst particular set of requests, which
+   * have been sent a long time ago, and if so, resend them.
+   *
+   * @param requestMap The map of requests we want to check for their problem
+   */
+  private void checkRequestsForProblemsInMap(
+      ConcurrentMap<ClientRequestId, RequestInfo> requestMap) {
     List<ClientRequestId> addedRequestIds = Lists.newArrayList();
     List<RequestInfo> addedRequestInfos = Lists.newArrayList();
     // Check all the requests for problems
     for (Map.Entry<ClientRequestId, RequestInfo> entry :
-        clientRequestIdRequestInfoMap.entrySet()) {
+        requestMap.entrySet()) {
       RequestInfo requestInfo = entry.getValue();
       ChannelFuture writeFuture = requestInfo.getWriteFuture();
       // Request wasn't sent yet
@@ -906,11 +1375,10 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
       ClientRequestId requestId = addedRequestIds.get(i);
       RequestInfo requestInfo = addedRequestInfos.get(i);
 
-      if (clientRequestIdRequestInfoMap.put(requestId, requestInfo) ==
-          null) {
+      if (requestMap.put(requestId, requestInfo) == null) {
         LOG.warn("checkRequestsForProblems: Request " + requestId +
             " completed prior to sending the next request");
-        clientRequestIdRequestInfoMap.remove(requestId);
+        requestMap.remove(requestId);
       }
       InetSocketAddress remoteServer = requestInfo.getDestinationAddress();
       Channel channel = getNextChannel(remoteServer);
@@ -975,4 +1443,48 @@ public class NettyClient implements 
ResetSuperstepMetricsObserver {
       }
     }
   }
+
+  /**
+   * @return Maximum number of open requests for each worker (user-defined
+   *         value)
+   */
+  public short getMaxOpenRequestsPerWorker() {
+    return (short) maxOpenRequestsPerWorker;
+  }
+
+  /**
+   * Implementation of a sempahore where number of available permits can change
+   */
+  private static final class AdjustableSemaphore extends Semaphore {
+    /** Maximum number of available permits */
+    private int maxPermits;
+
+    /**
+     * Constructor
+     * @param permits initial number of available permits
+     */
+    public AdjustableSemaphore(int permits) {
+      super(permits);
+      maxPermits = permits;
+    }
+
+    /**
+     * Adjusts the maximum number of available permits.
+     *
+     * @param newMax max number of permits
+     */
+    public synchronized void setMaxPermits(int newMax) {
+      checkState(newMax >= 0, "setMaxPermits: number of permits cannot be " +
+          "less than 0");
+      int delta = newMax - this.maxPermits;
+      if (delta > 0) {
+        // Releasing semaphore to make room for 'delta' more permits
+        release(delta);
+      } else if (delta < 0) {
+        // Reducing number of permits in the semaphore
+        reducePermits(-delta);
+      }
+      this.maxPermits = newMax;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/97e26e65/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
index 2a89109..065935d 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
@@ -180,4 +180,12 @@ else[HADOOP_NON_SECURE]*/
   }
 
 /*end[HADOOP_NON_SECURE]*/
+
+  /**
+   * @return Maximum number of open requests for each worker (user-defined
+   *         value)
+   */
+  public short getMaxOpenRequestsPerWorker() {
+    return nettyClient.getMaxOpenRequestsPerWorker();
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/97e26e65/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AckSignalFlag.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AckSignalFlag.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AckSignalFlag.java
new file mode 100644
index 0000000..4f75021
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AckSignalFlag.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+/**
+ * Flag used to differentiate acknowledgement flags.
+ */
+public enum AckSignalFlag {
+  /** The request is/was new */
+  NEW_REQUEST,
+  /** The request is/was not new, and is/was already processed */
+  DUPLICATE_REQUEST
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/97e26e65/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
index 9aa88ae..96b096f 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
@@ -49,6 +49,16 @@ public class MasterRequestServerHandler extends
   }
 
   @Override
+  protected short getCurrentMaxCredit() {
+    return 0;
+  }
+
+  @Override
+  protected boolean shouldIgnoreCredit(int taskId) {
+    return true;
+  }
+
+  @Override
   public void processRequest(MasterRequest request) {
     request.doRequest(commHandler);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/97e26e65/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
index d75870a..ab96714 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.comm.netty.handler;
 
+import org.apache.giraph.comm.netty.NettyClient;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.TaskInfo;
@@ -40,7 +41,7 @@ import static 
org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUES
 public abstract class RequestServerHandler<R> extends
   ChannelInboundHandlerAdapter {
   /** Number of bytes in the encoded response */
-  public static final int RESPONSE_BYTES = 13;
+  public static final int RESPONSE_BYTES = 14;
   /** Time class to use */
   private static Time TIME = SystemTime.get();
   /** Class logger */
@@ -58,6 +59,8 @@ public abstract class RequestServerHandler<R> extends
   private long startProcessingNanoseconds = -1;
   /** Handler for uncaught exceptions */
   private final Thread.UncaughtExceptionHandler exceptionHandler;
+  /** Do we have a limit on the number of open requests per worker */
+  private final boolean limitOpenRequestsPerWorker;
 
   /**
    * Constructor
@@ -76,6 +79,8 @@ public abstract class RequestServerHandler<R> extends
     closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
     this.myTaskInfo = myTaskInfo;
     this.exceptionHandler = exceptionHandler;
+    this.limitOpenRequestsPerWorker =
+        NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
   }
 
   @Override
@@ -98,7 +103,7 @@ public abstract class RequestServerHandler<R> extends
     }
 
     // Only execute this request exactly once
-    int alreadyDone = 1;
+    AckSignalFlag alreadyDone = AckSignalFlag.DUPLICATE_REQUEST;
     if (workerRequestReservedMap.reserveRequest(
         request.getClientId(),
         request.getRequestId())) {
@@ -113,7 +118,7 @@ public abstract class RequestServerHandler<R> extends
             ", " +  request.getType() + " took " +
             Times.getNanosSince(TIME, startProcessingNanoseconds) + " ns");
       }
-      alreadyDone = 0;
+      alreadyDone = AckSignalFlag.NEW_REQUEST;
     } else {
       LOG.info("messageReceived: Request id " +
           request.getRequestId() + " from client " +
@@ -126,12 +131,38 @@ public abstract class RequestServerHandler<R> extends
     ByteBuf buffer = ctx.alloc().buffer(RESPONSE_BYTES);
     buffer.writeInt(myTaskInfo.getTaskId());
     buffer.writeLong(request.getRequestId());
-    buffer.writeByte(alreadyDone);
+    short signal;
+    if (limitOpenRequestsPerWorker) {
+      signal = NettyClient.calculateResponse(alreadyDone,
+          shouldIgnoreCredit(request.getClientId()), getCurrentMaxCredit());
+    } else {
+      signal = (short) alreadyDone.ordinal();
+    }
+    buffer.writeShort(signal);
 
     ctx.write(buffer);
   }
 
   /**
+   * Get the maximum number of open requests per worker (credit) at the moment
+   * the method is called. This number should generally depend on the available
+   * memory and processing rate.
+   *
+   * @return maximum number of open requests for each worker
+   */
+  protected abstract short getCurrentMaxCredit();
+
+  /**
+   * Whether we should ignore credit-based control flow in communicating with
+   * task with a given id. Generally, communication with master node does not
+   * require any control-flow mechanism.
+   *
+   * @param taskId id of the task on the other end of the communication
+   * @return 0 if credit should be ignored, 1 otherwise
+   */
+  protected abstract boolean shouldIgnoreCredit(int taskId);
+
+  /**
    * Set the flag indicating already closed first request
    */
   private static void setAlreadyClosedFirstRequest() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/97e26e65/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
index f0fd1e5..54cb201 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
@@ -18,11 +18,10 @@
 
 package org.apache.giraph.comm.netty.handler;
 
+import org.apache.giraph.comm.netty.NettyClient;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 
-import java.util.concurrent.ConcurrentMap;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -41,22 +40,16 @@ public class ResponseClientHandler extends 
ChannelInboundHandlerAdapter {
   private static volatile boolean ALREADY_DROPPED_FIRST_RESPONSE = false;
   /** Drop first response (used for simulating failure) */
   private final boolean dropFirstResponse;
-  /** Outstanding worker request map */
-  private final ConcurrentMap<ClientRequestId,
-      RequestInfo> workerIdOutstandingRequestMap;
+  /** Netty client that does the actual I/O and keeps track of open requests */
+  private final NettyClient nettyClient;
 
   /**
    * Constructor.
-   *
-   * @param workerIdOutstandingRequestMap Map of worker ids to outstanding
-   *                                      requests
+   * @param nettyClient Client that does the actual I/O
    * @param conf Configuration
    */
-  public ResponseClientHandler(
-      ConcurrentMap<ClientRequestId, RequestInfo>
-          workerIdOutstandingRequestMap,
-      Configuration conf) {
-    this.workerIdOutstandingRequestMap = workerIdOutstandingRequestMap;
+  public ResponseClientHandler(NettyClient nettyClient, Configuration conf) {
+    this.nettyClient = nettyClient;
     dropFirstResponse = NETTY_SIMULATE_FIRST_RESPONSE_FAILED.get(conf);
   }
 
@@ -64,61 +57,34 @@ public class ResponseClientHandler extends 
ChannelInboundHandlerAdapter {
   public void channelRead(ChannelHandlerContext ctx, Object msg)
     throws Exception {
     if (!(msg instanceof ByteBuf)) {
-      throw new IllegalStateException("messageReceived: Got a " +
+      throw new IllegalStateException("channelRead: Got a " +
           "non-ByteBuf message " + msg);
     }
 
     ByteBuf buf = (ByteBuf) msg;
     int senderId = -1;
     long requestId = -1;
-    int response = -1;
+    short response = -1;
     try {
       senderId = buf.readInt();
       requestId = buf.readLong();
-      response = buf.readByte();
+      response = buf.readShort();
     } catch (IndexOutOfBoundsException e) {
       throw new IllegalStateException(
           "channelRead: Got IndexOutOfBoundsException ", e);
     }
     ReferenceCountUtil.release(buf);
 
-
+    boolean shouldDrop = false;
     // Simulate a failed response on the first response (if desired)
     if (dropFirstResponse && !ALREADY_DROPPED_FIRST_RESPONSE) {
-      LOG.info("messageReceived: Simulating dropped response " + response +
+      LOG.info("channelRead: Simulating dropped response " + response +
           " for request " + requestId);
       setAlreadyDroppedFirstResponse();
-      synchronized (workerIdOutstandingRequestMap) {
-        workerIdOutstandingRequestMap.notifyAll();
-      }
-      return;
-    }
-
-    if (response == 1) {
-      LOG.info("messageReceived: Already completed request (taskId = " +
-          senderId + ", requestId = " + requestId + ")");
-    } else if (response != 0) {
-      throw new IllegalStateException(
-          "messageReceived: Got illegal response " + response);
-    }
-
-    RequestInfo requestInfo = workerIdOutstandingRequestMap.remove(
-        new ClientRequestId(senderId, requestId));
-    if (requestInfo == null) {
-      LOG.info("messageReceived: Already received response for (taskId = " +
-          senderId + ", requestId = " + requestId + ")");
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("messageReceived: Completed (taskId = " + senderId + ")" +
-            requestInfo + ".  Waiting on " + workerIdOutstandingRequestMap
-            .size() + " requests");
-      }
+      shouldDrop = true;
     }
 
-    // Help NettyClient#waitSomeRequests() to finish faster
-    synchronized (workerIdOutstandingRequestMap) {
-      workerIdOutstandingRequestMap.notifyAll();
-    }
+    nettyClient.messageReceived(senderId, requestId, response, shouldDrop);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/97e26e65/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
index 574e413..18e79ce 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.comm.netty.handler;
 
+import org.apache.giraph.comm.netty.NettyWorkerClient;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.requests.WorkerRequest;
@@ -58,6 +59,17 @@ public class WorkerRequestServerHandler<I extends 
WritableComparable,
   }
 
   @Override
+  protected short getCurrentMaxCredit() {
+    return ((NettyWorkerClient) (serverData.getServiceWorker()
+        .getWorkerClient())).getMaxOpenRequestsPerWorker();
+  }
+
+  @Override
+  protected boolean shouldIgnoreCredit(int taskId) {
+    return taskId == serverData.getServiceWorker().getMasterInfo().getTaskId();
+  }
+
+  @Override
   public void processRequest(WorkerRequest<I, V, E> request) {
     request.doRequest(serverData);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/97e26e65/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java 
b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index 0462770..b782fe5 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -20,6 +20,7 @@ package org.apache.giraph.comm;
 
 import org.apache.giraph.comm.netty.NettyClient;
 import org.apache.giraph.comm.netty.NettyServer;
+import org.apache.giraph.comm.netty.handler.AckSignalFlag;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
 import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
 import org.apache.giraph.comm.requests.SendVertexRequest;
@@ -312,4 +313,14 @@ public class RequestTest {
     }
     assertEquals(55, keySum);
   }
+
+  @Test
+
+  public void creditBasedResponseTest() throws IOException {
+    short response = NettyClient.calculateResponse(AckSignalFlag.NEW_REQUEST,
+        true, (short) 256);
+    assertEquals(NettyClient.getAckSignalFlag(response), 
AckSignalFlag.NEW_REQUEST);
+    assertEquals(NettyClient.shouldIgnoreCredit(response), true);
+    assertEquals(NettyClient.getCredit(response), (short) 256);
+  }
 }
\ No newline at end of file

Reply via email to