Repository: giraph Updated Branches: refs/heads/trunk 16dba64c3 -> 97e26e65f
Improve flow control on sender side (pre-requisite for credit-based flow control) Summary: Currently, a sender worker will keep all open requests (and optionally up to a certain number of total open requests) in its own memory. This behavior may cause high memory usage in sender side. Also, since messages can arrive to a worker at an arbitrary rate, receiver may not have the ability to handle all incoming messages, hence we may see a large memory footprint in receiver as well. This diff addresses the problem by limiting the number of open requests per worker in sender side. Also, it provides a cache of unsent requests on sender in case the sender already sent enough messages to another worker but has not received any response back. Test Plan: mvn clean verify Reviewers: avery.ching, sergey.edunov, maja.kabiljo, dionysis.logothetis Reviewed By: dionysis.logothetis Subscribers: Alessio Differential Revision: https://reviews.facebook.net/D43797 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/97e26e65 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/97e26e65 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/97e26e65 Branch: refs/heads/trunk Commit: 97e26e65f0ca669926c9c8ddc05f19da76180ab5 Parents: 16dba64 Author: Hassan Eslami <[email protected]> Authored: Mon Apr 4 16:29:36 2016 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Mon Apr 4 16:31:31 2016 -0700 ---------------------------------------------------------------------- .../apache/giraph/comm/netty/NettyClient.java | 588 +++++++++++++++++-- .../giraph/comm/netty/NettyWorkerClient.java | 8 + .../comm/netty/handler/AckSignalFlag.java | 29 + .../handler/MasterRequestServerHandler.java | 10 + .../netty/handler/RequestServerHandler.java | 39 +- .../netty/handler/ResponseClientHandler.java | 60 +- .../handler/WorkerRequestServerHandler.java | 12 + .../org/apache/giraph/comm/RequestTest.java | 11 + 8 files changed, 668 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/97e26e65/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java index 8ea11a5..15f0502 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java @@ -18,6 +18,9 @@ package org.apache.giraph.comm.netty; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.giraph.comm.netty.handler.AckSignalFlag; import org.apache.giraph.comm.netty.handler.AddressRequestIdGenerator; import org.apache.giraph.comm.netty.handler.ClientRequestId; import org.apache.giraph.comm.netty.handler.RequestEncoder; @@ -56,12 +59,16 @@ import com.yammer.metrics.core.Counter; import java.io.IOException; /*end[HADOOP_NON_SECURE]*/ import java.net.InetSocketAddress; +import java.util.ArrayDeque; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.Deque; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -83,6 +90,7 @@ import io.netty.util.AttributeKey; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; +import static com.google.common.base.Preconditions.checkState; import static org.apache.giraph.conf.GiraphConstants.CLIENT_RECEIVE_BUFFER_SIZE; import static org.apache.giraph.conf.GiraphConstants.CLIENT_SEND_BUFFER_SIZE; import static org.apache.giraph.conf.GiraphConstants.MAX_REQUEST_MILLISECONDS; @@ -92,7 +100,6 @@ import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_THRE import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER; import static org.apache.giraph.conf.GiraphConstants.NETTY_MAX_CONNECTION_FAILURES; import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS; - /** * Netty client for sending requests. Thread-safe. */ @@ -105,6 +112,38 @@ public class NettyClient implements ResetSuperstepMetricsObserver { public static final IntConfOption MAX_NUMBER_OF_OPEN_REQUESTS = new IntConfOption("giraph.maxNumberOfOpenRequests", 10000, "Maximum number of requests without confirmation we should have"); + + /** + * Do we have a limit on number of open requests we can have for each worker. + * Note that if this option is enabled, Netty will not keep more than a + * certain number of requests open for each other worker in the job. If there + * are more requests generated for a worker, Netty will not actually send the + * surplus requests, instead, it caches the requests in a local buffer. The + * maximum number of these unsent requests in the cache is another + * user-defined parameter (MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER). + */ + public static final BooleanConfOption LIMIT_OPEN_REQUESTS_PER_WORKER = + new BooleanConfOption("giraph.waitForPerWorkerRequests", false, + "Whether to have a limit on number of open requests for each worker" + + "or not"); + /** + * Maximum number of requests we can have per worker without confirmation + * (i.e. open requests) + */ + public static final IntConfOption MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER = + new IntConfOption("giraph.maxOpenRequestsPerWorker", 20, + "Maximum number of requests without confirmation we can have per " + + "worker"); + /** Aggregate number of in-memory unsent requests */ + public static final IntConfOption MAX_NUM_OF_UNSENT_REQUESTS = + new IntConfOption("giraph.maxNumberOfUnsentRequests", 2000, + "Maximum number of unsent requests we can keep in memory"); + /** + * Time interval to wait on unsent requests cahce until we find a spot in it + */ + public static final IntConfOption UNSENT_CACHE_WAIT_INTERVAL = + new IntConfOption("giraph.unsentCacheWaitInterval", 1000, + "Time interval to wait on unsent requests cache (in milliseconds)"); /** * After pausing a thread due to too large number of open requests, * which fraction of these requests need to be closed before we continue @@ -112,7 +151,7 @@ public class NettyClient implements ResetSuperstepMetricsObserver { public static final FloatConfOption FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING = new FloatConfOption("giraph.fractionOfRequestsToCloseBeforeProceeding", - 0.2f, "Fraction of requsts to close before proceeding"); + 0.2f, "Fraction of requests to close before proceeding"); /** Maximum number of requests to list (for debugging) */ public static final int MAX_REQUESTS_TO_LIST = 10; /** @@ -167,16 +206,49 @@ public class NettyClient implements ResetSuperstepMetricsObserver { private final float requestSizeWarningThreshold; /** Maximum number of requests without confirmation we can have */ private final int maxNumberOfOpenRequests; + /** Wait interval on unsent requests cache until it frees up */ + private final int unsentWaitMsecs; /** * Maximum number of requests that can be open after the pause in order to * proceed */ private final int numberOfRequestsToProceed; + /** Do we have a limit on the number of open requests per worker */ + private final boolean limitOpenRequestsPerWorker; + /** Maximum number of open requests we can have for each worker */ + private final int maxOpenRequestsPerWorker; + /** Total number of open requests */ + private final AtomicInteger aggregateOpenRequests; + /** Total number of unsent, cached requests */ + private final AtomicInteger aggregateUnsentRequests; + /** + * Map of requests per worker. Key in the map is the worker id and the value + * in the map is a pair (X, Y) where: + * Y = map of client request ids to request information for a particular + * worker, + * X = the semaphore to control the number of open requests for the + * the particular worker. Basically, the number of available permits + * on this semaphore is the credit available for the worker (in credit- + * based control-flow mechanism), and the number of permits already + * taken from the semaphore should be equal to the size of Y (unless a + * transition is happening). + */ + private final ConcurrentMap<Integer, + Pair<AdjustableSemaphore, ConcurrentMap<ClientRequestId, RequestInfo>>> + perWorkerOpenRequestMap; + /** Map of unsent, cached requests per worker */ + private final ConcurrentMap<Integer, Deque<WritableRequest>> + perWorkerUnsentRequestMap; + /** + * Semaphore to control number of cached unsent request. Maximum number of + * permits of this semaphore should be equal to MAX_NUM_OF_UNSENT_REQUESTS. + */ + private final Semaphore unsentRequestPermit; /** Maximum number of connection failures */ private final int maxConnectionFailures; /** Maximum number of milliseconds for a request */ private final int maxRequestMilliseconds; - /** Waiting internal for checking outstanding requests msecs */ + /** Waiting interval for checking outstanding requests msecs */ private final int waitingRequestMsecs; /** Timed logger for printing request debugging */ private final TimedLogger requestLogger = new TimedLogger(15 * 1000, LOG); @@ -244,6 +316,31 @@ public class NettyClient implements ResetSuperstepMetricsObserver { maxNumberOfOpenRequests = -1; numberOfRequestsToProceed = 0; } + limitOpenRequestsPerWorker = LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf); + /* Maximum number of unsent requests we can have in total */ + if (limitOpenRequestsPerWorker) { + checkState(!limitNumberOfOpenRequests, "NettyClient: it is not allowed " + + "to have both limitation on the number of total open requests, and " + + "limitation on the number of open requests per worker!"); + maxOpenRequestsPerWorker = MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf); + checkState(maxOpenRequestsPerWorker > 0x3FFF || + maxOpenRequestsPerWorker < 1, "NettyClient: max number of open " + + "requests should be in range [1, " + 0x3FFF + "]"); + aggregateOpenRequests = new AtomicInteger(0); + aggregateUnsentRequests = new AtomicInteger(0); + perWorkerOpenRequestMap = Maps.newConcurrentMap(); + perWorkerUnsentRequestMap = Maps.newConcurrentMap(); + unsentRequestPermit = new Semaphore(MAX_NUM_OF_UNSENT_REQUESTS.get(conf)); + unsentWaitMsecs = UNSENT_CACHE_WAIT_INTERVAL.get(conf); + } else { + maxOpenRequestsPerWorker = -1; + aggregateOpenRequests = new AtomicInteger(-1); + aggregateUnsentRequests = new AtomicInteger(-1); + perWorkerOpenRequestMap = null; + perWorkerUnsentRequestMap = null; + unsentRequestPermit = null; + unsentWaitMsecs = -1; + } maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf); maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf); @@ -340,8 +437,8 @@ public class NettyClient implements ResetSuperstepMetricsObserver { new SaslClientHandler(conf), handlerToUseExecutionGroup, executionGroup, ch); PipelineUtils.addLastWithExecutorCheck("response-handler", - new ResponseClientHandler(clientRequestIdRequestInfoMap, - conf), handlerToUseExecutionGroup, executionGroup, ch); + new ResponseClientHandler(NettyClient.this, conf), + handlerToUseExecutionGroup, executionGroup, ch); } else { LOG.info("Using Netty without authentication."); /*end[HADOOP_NON_SECURE]*/ @@ -371,8 +468,8 @@ public class NettyClient implements ResetSuperstepMetricsObserver { new RequestEncoder(conf), handlerToUseExecutionGroup, executionGroup, ch); PipelineUtils.addLastWithExecutorCheck("response-handler", - new ResponseClientHandler(clientRequestIdRequestInfoMap, - conf), handlerToUseExecutionGroup, executionGroup, ch); + new ResponseClientHandler(NettyClient.this, conf), + handlerToUseExecutionGroup, executionGroup, ch); /*if_not[HADOOP_NON_SECURE]*/ } @@ -429,13 +526,14 @@ public class NettyClient implements ResetSuperstepMetricsObserver { Lists.newArrayListWithCapacity(tasks.size() * channelsPerServer); for (TaskInfo taskInfo : tasks) { context.progress(); - InetSocketAddress address = taskIdAddressMap.get(taskInfo.getTaskId()); + int taskId = taskInfo.getTaskId(); + InetSocketAddress address = taskIdAddressMap.get(taskId); if (address == null || !address.getHostName().equals(taskInfo.getHostname()) || address.getPort() != taskInfo.getPort()) { address = resolveAddress(maxResolveAddressAttempts, taskInfo.getHostOrIp(), taskInfo.getPort()); - taskIdAddressMap.put(taskInfo.getTaskId(), address); + taskIdAddressMap.put(taskId, address); } if (address == null || address.getHostName() == null || address.getHostName().isEmpty()) { @@ -447,6 +545,15 @@ public class NettyClient implements ResetSuperstepMetricsObserver { "address " + address); } + if (limitOpenRequestsPerWorker && + !perWorkerOpenRequestMap.containsKey(taskId)) { + perWorkerOpenRequestMap.put(taskId, + new ImmutablePair<>( + new AdjustableSemaphore(maxOpenRequestsPerWorker), + Maps.<ClientRequestId, RequestInfo>newConcurrentMap())); + perWorkerUnsentRequestMap + .put(taskId, new ArrayDeque<WritableRequest>()); + } if (addressChannelMap.containsKey(address)) { continue; } @@ -457,7 +564,7 @@ public class NettyClient implements ResetSuperstepMetricsObserver { waitingConnectionList.add( new ChannelFutureAddress( - connectionFuture, address, taskInfo.getTaskId())); + connectionFuture, address, taskId)); } } @@ -708,8 +815,127 @@ public class NettyClient implements ResetSuperstepMetricsObserver { */ public void sendWritableRequest(int destTaskId, WritableRequest request) { + ConcurrentMap<ClientRequestId, RequestInfo> requestMap; + if (limitOpenRequestsPerWorker) { + requestMap = reserveSpotForRequest(destTaskId, request); + if (requestMap == null) { + return; + } + } else { + requestMap = clientRequestIdRequestInfoMap; + } + + doSend(destTaskId, request, requestMap); + + if (limitNumberOfOpenRequests && + requestMap.size() > maxNumberOfOpenRequests) { + long startTime = System.currentTimeMillis(); + waitSomeRequests(numberOfRequestsToProceed); + timeWaitingOnOpenRequests.inc(System.currentTimeMillis() - startTime); + } else if (limitOpenRequestsPerWorker) { + aggregateOpenRequests.getAndIncrement(); + } + } + + /** + * Try to reserve a spot in the open requests map for a specific worker. If + * no spot is available in the open requests map, try to reserve a spot in the + * unsent requests cache. If there is no spot there too, wait until a spot + * becomes available either in the open requests map or unsent requests cache. + * This method is used when credit-based flow-control is enabled. + * + * @param destTaskId Destination task id + * @param request Request to send + * @return Open request map of the given task, or null if there was no spot in + * the open request map, but there was a spot in unsent requests cache + */ + private ConcurrentMap<ClientRequestId, RequestInfo> + reserveSpotForRequest(int destTaskId, WritableRequest request) { + Pair<AdjustableSemaphore, ConcurrentMap<ClientRequestId, RequestInfo>> + pair = perWorkerOpenRequestMap.get(destTaskId); + ConcurrentMap<ClientRequestId, RequestInfo> requestMap = pair.getRight(); + final AdjustableSemaphore openRequestPermit = pair.getLeft(); + // Try to reserve a spot for the request amongst the open requests of + // the destination worker. + boolean shouldSend = openRequestPermit.tryAcquire(); + boolean shouldCache = false; + while (!shouldSend) { + // We should not send the request, and should cache the request instead. + // It may be possible that the unsent message cache is also full, so we + // should try to acquire a space on the cache, and if there is no extra + // space in unsent request cache, we should wait until some space + // become available. However, it is possible that during the time we are + // waiting on the unsent messages cache, actual buffer for open requests + // frees up space. + try { + shouldCache = unsentRequestPermit.tryAcquire(unsentWaitMsecs, + TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new IllegalStateException("sendWritableRequest: failed " + + "while waiting on the unsent request cache to have some more " + + "room for extra unsent requests!"); + } + if (shouldCache) { + break; + } + // We may have an open spot in the meantime that we were waiting on the + // unsent requests. + shouldSend = openRequestPermit.tryAcquire(); + if (shouldSend) { + break; + } + // The current thread will be at this point only if it could not make + // space amongst open requests for the destination worker and has been + // timed-out in trying to acquire a space amongst unsent messages. So, + // we should report logs, report progress, and check for request + // failures. + logInfoAboutOpenRequests(-1); + context.progress(); + checkRequestsForProblems(); + } + // Either shouldSend == true or shouldCache == true + if (shouldCache) { + Deque<WritableRequest> unsentRequests = + perWorkerUnsentRequestMap.get(destTaskId); + // This synchronize block is necessary for the following reason: + // Once we are at this point, it means there was no room for this + // request to become an open request, hence we have to put it into + // unsent cache. Consider the case that since last time we checked if + // there is any room for an additional open request so far, all open + // requests are delivered and their acknowledgements are also processed. + // Now, if we put this request in the unsent cache, it is not being + // considered to become an open request, as the only one who checks + // on this matter would be the one who receives an acknowledgment for an + // open request for the destination worker. So, a lock is necessary + // to forcefully serialize the execution if this scenario is about to + // happen. + synchronized (unsentRequests) { + shouldSend = openRequestPermit.tryAcquire(); + if (!shouldSend) { + aggregateUnsentRequests.getAndIncrement(); + unsentRequests.add(request); + return null; + } + } + } + return requestMap; + } + + /** + * Sends a request. + * + * @param destTaskId destination to send the request to + * @param request request itself + * @param requestMap the map that should hold the request, used for tracking + * the request later + */ + private void doSend(int destTaskId, WritableRequest request, + ConcurrentMap<ClientRequestId, RequestInfo> requestMap) { InetSocketAddress remoteServer = taskIdAddressMap.get(destTaskId); - if (clientRequestIdRequestInfoMap.isEmpty()) { + if ((limitOpenRequestsPerWorker && + aggregateOpenRequests.get() == 0) || + (!limitOpenRequestsPerWorker && + requestMap.isEmpty())) { inboundByteCounter.resetAll(); outboundByteCounter.resetAll(); } @@ -728,7 +954,7 @@ public class NettyClient implements ResetSuperstepMetricsObserver { addressRequestIdGenerator.getNextRequestId(remoteServer)); ClientRequestId clientRequestId = new ClientRequestId(destTaskId, request.getRequestId()); - RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent( + RequestInfo oldRequestInfo = requestMap.putIfAbsent( clientRequestId, newRequestInfo); if (oldRequestInfo != null) { throw new IllegalStateException("sendWritableRequest: Impossible to " + @@ -737,7 +963,7 @@ public class NettyClient implements ResetSuperstepMetricsObserver { } } if (request.getSerializedSize() > - requestSizeWarningThreshold * sendBufferSize) { + requestSizeWarningThreshold * sendBufferSize) { LOG.warn("Creating large request of type " + request.getClass() + ", size " + request.getSerializedSize() + " bytes. Check netty buffer size."); @@ -745,13 +971,178 @@ public class NettyClient implements ResetSuperstepMetricsObserver { ChannelFuture writeFuture = channel.write(request); newRequestInfo.setWriteFuture(writeFuture); writeFuture.addListener(logErrorListener); + } - if (limitNumberOfOpenRequests && - clientRequestIdRequestInfoMap.size() > maxNumberOfOpenRequests) { - long startTime = System.currentTimeMillis(); - waitSomeRequests(numberOfRequestsToProceed); - timeWaitingOnOpenRequests.inc(System.currentTimeMillis() - startTime); + /** + * Get the response flag from a response + * + * @param response response received + * @return AckSignalFlag coming with the response + */ + public static AckSignalFlag getAckSignalFlag(short response) { + return AckSignalFlag.values()[(response >> 15) & 1]; + } + + /** + * Whether response specifies that credit should be ignored + * + * @param response response received + * @return true iff credit should be ignored, false otherwise + */ + public static boolean shouldIgnoreCredit(short response) { + return ((short) ((response >> 14) & 1)) == 1; + } + + /** + * Get the credit from a response + * + * @param response response received + * @return credit from the received response + */ + public static short getCredit(short response) { + return (short) (response & 0x3FFF); + } + + /** + * Calculate/Build a response for given flag and credit + * + * @param flag AckSignalFlag that should be sent with the response + * @param ignoreCredit whether this response specified to ignore the credit + * @param credit if credit is not ignored, what should be the max credit + * @return the response to be sent out along with the acknowledgement + */ + public static short calculateResponse(AckSignalFlag flag, + boolean ignoreCredit, short credit) { + if (credit > 0x3FFF || credit < 1) { + LOG.warn("calculateResponse: value of the credit is " + credit + + " while it should be in range [1, " + 0x3FFF + "]"); + credit = (short) Math.max((short) Math.min(credit, 0x3FFF), 1); } + return (short) ((flag.ordinal() << 15) | + ((ignoreCredit ? 1 : 0) << 14) | (credit & 0x3FFF)); + } + + /** + * Handle receipt of a message. Called by response handler. + * + * @param senderId Id of sender of the message + * @param requestId Id of the request + * @param response Actual response + * @param shouldDrop Drop the message? + */ + public void messageReceived(int senderId, long requestId, short response, + boolean shouldDrop) { + if (shouldDrop) { + if (!limitOpenRequestsPerWorker) { + synchronized (clientRequestIdRequestInfoMap) { + clientRequestIdRequestInfoMap.notifyAll(); + } + } + return; + } + boolean shouldIgnoreCredit = shouldIgnoreCredit(response); + short credit = getCredit(response); + AckSignalFlag responseFlag = getAckSignalFlag(response); + if (responseFlag == AckSignalFlag.DUPLICATE_REQUEST) { + LOG.info("messageReceived: Already completed request (taskId = " + + senderId + ", requestId = " + requestId + ")"); + } else if (responseFlag != AckSignalFlag.NEW_REQUEST) { + throw new IllegalStateException( + "messageReceived: Got illegal response " + response); + } + RequestInfo requestInfo; + int numOpenRequests; + if (limitOpenRequestsPerWorker) { + requestInfo = + processResponse(senderId, requestId, shouldIgnoreCredit, credit); + numOpenRequests = aggregateOpenRequests.get(); + if (numOpenRequests == 0) { + synchronized (aggregateOpenRequests) { + aggregateOpenRequests.notifyAll(); + } + } + } else { + requestInfo = clientRequestIdRequestInfoMap + .remove(new ClientRequestId(senderId, requestId)); + numOpenRequests = clientRequestIdRequestInfoMap.size(); + } + if (requestInfo == null) { + LOG.info("messageReceived: Already received response for (taskId = " + + senderId + ", requestId = " + requestId + ")"); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("messageReceived: Completed (taskId = " + senderId + ")" + + requestInfo + ". Waiting on " + numOpenRequests + " requests"); + } + } + + if (!limitOpenRequestsPerWorker) { + // Help waitSomeRequests() to finish faster + synchronized (clientRequestIdRequestInfoMap) { + clientRequestIdRequestInfoMap.notifyAll(); + } + } + } + + /** + * Process a response for credit based flow-control. Open up a spot in the + * open request map of the sender, and send from unsent requests cache as much + * as there is open space in the open request map. + * + * @param senderId the sender from whom we got the response + * @param requestId the id of the request the response belongs to + * @param ignoreCredit whether credit based mechanism should take action + * @param credit what should be the credit if credit based mechanism is + * enabled + * @return Info of the request the response belongs to + */ + private RequestInfo processResponse(int senderId, long requestId, + boolean ignoreCredit, short credit) { + Pair<AdjustableSemaphore, ConcurrentMap<ClientRequestId, RequestInfo>> + pair = perWorkerOpenRequestMap.get(senderId); + ConcurrentMap<ClientRequestId, RequestInfo> requestMap = pair.getRight(); + RequestInfo requestInfo = + requestMap.remove(new ClientRequestId(senderId, requestId)); + AdjustableSemaphore openRequestPermit = pair.getLeft(); + if (requestInfo != null) { + openRequestPermit.release(); + aggregateOpenRequests.getAndDecrement(); + if (!ignoreCredit) { + openRequestPermit.setMaxPermits(credit); + } + Deque<WritableRequest> requestDeque = + perWorkerUnsentRequestMap.get(senderId); + // Since we received a response and we changed the credit of the sender + // client, we may be able to send some more requests to the sender + // client. So, we try to send as much request as we can to the sender + // client. + while (true) { + WritableRequest request; + synchronized (requestDeque) { + request = requestDeque.pollFirst(); + if (request == null) { + break; + } + // See whether the sender client has any unused credit + if (!openRequestPermit.tryAcquire()) { + requestDeque.offerFirst(request); + break; + } + } + // At this point, we have a request, and we reserved a credit for the + // sender client. So, we send the request to the client and update + // the state. + doSend(senderId, request, requestMap); + if (aggregateUnsentRequests.decrementAndGet() == 0) { + synchronized (aggregateUnsentRequests) { + aggregateUnsentRequests.notifyAll(); + } + } + aggregateOpenRequests.getAndIncrement(); + unsentRequestPermit.release(); + } + } + return requestInfo; } /** @@ -760,7 +1151,12 @@ public class NettyClient implements ResetSuperstepMetricsObserver { * @throws InterruptedException */ public void waitAllRequests() { - waitSomeRequests(0); + if (limitOpenRequestsPerWorker) { + waitSomeRequests(aggregateUnsentRequests); + waitSomeRequests(aggregateOpenRequests); + } else { + waitSomeRequests(0); + } if (LOG.isInfoEnabled()) { LOG.info("waitAllRequests: Finished all requests. " + inboundByteCounter.getMetrics() + "\n" + outboundByteCounter @@ -769,6 +1165,36 @@ public class NettyClient implements ResetSuperstepMetricsObserver { } /** + * Wait for some requests to complete. The aggregate number of such requests + * is given. Periodically check the state of current open requests. If there + * is an issue in any of them, re-send the request. + * + * @param aggregateRequests object keeping aggregate number of requests to + * wait for + */ + private void waitSomeRequests(final AtomicInteger aggregateRequests) { + while (true) { + synchronized (aggregateRequests) { + if (aggregateRequests.get() == 0) { + break; + } + try { + aggregateRequests.wait(waitingRequestMsecs); + } catch (InterruptedException e) { + throw new IllegalStateException("waitSomeRequests: failed while " + + "waiting on open/cached requests"); + } + if (aggregateRequests.get() == 0) { + break; + } + } + logInfoAboutOpenRequests(-1); + context.progress(); + checkRequestsForProblems(); + } + } + + /** * Ensure that at most maxOpenRequests are not complete. Periodically, * check the state of every request. If we find the connection failed, * re-establish it and re-send the request. @@ -803,30 +1229,55 @@ public class NettyClient implements ResetSuperstepMetricsObserver { * @param maxOpenRequests Maximum number of requests which can be not complete */ private void logInfoAboutOpenRequests(int maxOpenRequests) { + int numOpenRequests = limitOpenRequestsPerWorker ? + aggregateOpenRequests.get() : + clientRequestIdRequestInfoMap.size(); if (LOG.isInfoEnabled() && requestLogger.isPrintable()) { LOG.info("logInfoAboutOpenRequests: Waiting interval of " + - waitingRequestMsecs + " msecs, " + - clientRequestIdRequestInfoMap.size() + - " open requests, waiting for it to be <= " + maxOpenRequests + - ", " + inboundByteCounter.getMetrics() + "\n" + + waitingRequestMsecs + " msecs, " + numOpenRequests + + " open requests, " + (limitOpenRequestsPerWorker ? + (aggregateUnsentRequests.get() + " unsent requests, ") : + ("waiting for it to be <= " + maxOpenRequests + ", ")) + + inboundByteCounter.getMetrics() + "\n" + outboundByteCounter.getMetrics()); - if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) { - for (Map.Entry<ClientRequestId, RequestInfo> entry : - clientRequestIdRequestInfoMap.entrySet()) { - LOG.info("logInfoAboutOpenRequests: Waiting for request " + - entry.getKey() + " - " + entry.getValue()); + if (numOpenRequests < MAX_REQUESTS_TO_LIST) { + if (limitOpenRequestsPerWorker) { + for (Pair<AdjustableSemaphore, + ConcurrentMap<ClientRequestId, RequestInfo>> + pair : perWorkerOpenRequestMap.values()) { + for (Map.Entry<ClientRequestId, RequestInfo> request : + pair.getRight().entrySet()) { + LOG.info("logInfoAboutOpenRequests: Waiting for request " + + request.getKey() + " - " + request.getValue()); + } + } + } else { + for (Map.Entry<ClientRequestId, RequestInfo> entry : + clientRequestIdRequestInfoMap.entrySet()) { + LOG.info("logInfoAboutOpenRequests: Waiting for request " + + entry.getKey() + " - " + entry.getValue()); + } } } // Count how many open requests each task has Map<Integer, Integer> openRequestCounts = Maps.newHashMap(); - for (ClientRequestId clientRequestId : - clientRequestIdRequestInfoMap.keySet()) { - int taskId = clientRequestId.getDestinationTaskId(); - Integer currentCount = openRequestCounts.get(taskId); - openRequestCounts.put(taskId, - (currentCount == null ? 0 : currentCount) + 1); + if (limitOpenRequestsPerWorker) { + for (Map.Entry<Integer, Pair<AdjustableSemaphore, + ConcurrentMap<ClientRequestId, RequestInfo>>> + entry : perWorkerOpenRequestMap.entrySet()) { + openRequestCounts + .put(entry.getKey(), entry.getValue().getRight().size()); + } + } else { + for (ClientRequestId clientRequestId : clientRequestIdRequestInfoMap + .keySet()) { + int taskId = clientRequestId.getDestinationTaskId(); + Integer currentCount = openRequestCounts.get(taskId); + openRequestCounts + .put(taskId, (currentCount == null ? 0 : currentCount) + 1); + } } // Sort it in decreasing order of number of open requests List<Map.Entry<Integer, Integer>> sorted = @@ -870,11 +1321,29 @@ public class NettyClient implements ResetSuperstepMetricsObserver { System.currentTimeMillis())) { return; } + if (limitOpenRequestsPerWorker) { + for (Pair<AdjustableSemaphore, ConcurrentMap<ClientRequestId, + RequestInfo>> pair : perWorkerOpenRequestMap.values()) { + checkRequestsForProblemsInMap(pair.getRight()); + } + } else { + checkRequestsForProblemsInMap(clientRequestIdRequestInfoMap); + } + } + + /** + * Check if there are open requests amongst particular set of requests, which + * have been sent a long time ago, and if so, resend them. + * + * @param requestMap The map of requests we want to check for their problem + */ + private void checkRequestsForProblemsInMap( + ConcurrentMap<ClientRequestId, RequestInfo> requestMap) { List<ClientRequestId> addedRequestIds = Lists.newArrayList(); List<RequestInfo> addedRequestInfos = Lists.newArrayList(); // Check all the requests for problems for (Map.Entry<ClientRequestId, RequestInfo> entry : - clientRequestIdRequestInfoMap.entrySet()) { + requestMap.entrySet()) { RequestInfo requestInfo = entry.getValue(); ChannelFuture writeFuture = requestInfo.getWriteFuture(); // Request wasn't sent yet @@ -906,11 +1375,10 @@ public class NettyClient implements ResetSuperstepMetricsObserver { ClientRequestId requestId = addedRequestIds.get(i); RequestInfo requestInfo = addedRequestInfos.get(i); - if (clientRequestIdRequestInfoMap.put(requestId, requestInfo) == - null) { + if (requestMap.put(requestId, requestInfo) == null) { LOG.warn("checkRequestsForProblems: Request " + requestId + " completed prior to sending the next request"); - clientRequestIdRequestInfoMap.remove(requestId); + requestMap.remove(requestId); } InetSocketAddress remoteServer = requestInfo.getDestinationAddress(); Channel channel = getNextChannel(remoteServer); @@ -975,4 +1443,48 @@ public class NettyClient implements ResetSuperstepMetricsObserver { } } } + + /** + * @return Maximum number of open requests for each worker (user-defined + * value) + */ + public short getMaxOpenRequestsPerWorker() { + return (short) maxOpenRequestsPerWorker; + } + + /** + * Implementation of a sempahore where number of available permits can change + */ + private static final class AdjustableSemaphore extends Semaphore { + /** Maximum number of available permits */ + private int maxPermits; + + /** + * Constructor + * @param permits initial number of available permits + */ + public AdjustableSemaphore(int permits) { + super(permits); + maxPermits = permits; + } + + /** + * Adjusts the maximum number of available permits. + * + * @param newMax max number of permits + */ + public synchronized void setMaxPermits(int newMax) { + checkState(newMax >= 0, "setMaxPermits: number of permits cannot be " + + "less than 0"); + int delta = newMax - this.maxPermits; + if (delta > 0) { + // Releasing semaphore to make room for 'delta' more permits + release(delta); + } else if (delta < 0) { + // Reducing number of permits in the semaphore + reducePermits(-delta); + } + this.maxPermits = newMax; + } + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/97e26e65/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java index 2a89109..065935d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java @@ -180,4 +180,12 @@ else[HADOOP_NON_SECURE]*/ } /*end[HADOOP_NON_SECURE]*/ + + /** + * @return Maximum number of open requests for each worker (user-defined + * value) + */ + public short getMaxOpenRequestsPerWorker() { + return nettyClient.getMaxOpenRequestsPerWorker(); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/97e26e65/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AckSignalFlag.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AckSignalFlag.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AckSignalFlag.java new file mode 100644 index 0000000..4f75021 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AckSignalFlag.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.netty.handler; + +/** + * Flag used to differentiate acknowledgement flags. + */ +public enum AckSignalFlag { + /** The request is/was new */ + NEW_REQUEST, + /** The request is/was not new, and is/was already processed */ + DUPLICATE_REQUEST +} http://git-wip-us.apache.org/repos/asf/giraph/blob/97e26e65/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java index 9aa88ae..96b096f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java @@ -49,6 +49,16 @@ public class MasterRequestServerHandler extends } @Override + protected short getCurrentMaxCredit() { + return 0; + } + + @Override + protected boolean shouldIgnoreCredit(int taskId) { + return true; + } + + @Override public void processRequest(MasterRequest request) { request.doRequest(commHandler); } http://git-wip-us.apache.org/repos/asf/giraph/blob/97e26e65/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java index d75870a..ab96714 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java @@ -18,6 +18,7 @@ package org.apache.giraph.comm.netty.handler; +import org.apache.giraph.comm.netty.NettyClient; import org.apache.giraph.comm.requests.WritableRequest; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.TaskInfo; @@ -40,7 +41,7 @@ import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUES public abstract class RequestServerHandler<R> extends ChannelInboundHandlerAdapter { /** Number of bytes in the encoded response */ - public static final int RESPONSE_BYTES = 13; + public static final int RESPONSE_BYTES = 14; /** Time class to use */ private static Time TIME = SystemTime.get(); /** Class logger */ @@ -58,6 +59,8 @@ public abstract class RequestServerHandler<R> extends private long startProcessingNanoseconds = -1; /** Handler for uncaught exceptions */ private final Thread.UncaughtExceptionHandler exceptionHandler; + /** Do we have a limit on the number of open requests per worker */ + private final boolean limitOpenRequestsPerWorker; /** * Constructor @@ -76,6 +79,8 @@ public abstract class RequestServerHandler<R> extends closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf); this.myTaskInfo = myTaskInfo; this.exceptionHandler = exceptionHandler; + this.limitOpenRequestsPerWorker = + NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf); } @Override @@ -98,7 +103,7 @@ public abstract class RequestServerHandler<R> extends } // Only execute this request exactly once - int alreadyDone = 1; + AckSignalFlag alreadyDone = AckSignalFlag.DUPLICATE_REQUEST; if (workerRequestReservedMap.reserveRequest( request.getClientId(), request.getRequestId())) { @@ -113,7 +118,7 @@ public abstract class RequestServerHandler<R> extends ", " + request.getType() + " took " + Times.getNanosSince(TIME, startProcessingNanoseconds) + " ns"); } - alreadyDone = 0; + alreadyDone = AckSignalFlag.NEW_REQUEST; } else { LOG.info("messageReceived: Request id " + request.getRequestId() + " from client " + @@ -126,12 +131,38 @@ public abstract class RequestServerHandler<R> extends ByteBuf buffer = ctx.alloc().buffer(RESPONSE_BYTES); buffer.writeInt(myTaskInfo.getTaskId()); buffer.writeLong(request.getRequestId()); - buffer.writeByte(alreadyDone); + short signal; + if (limitOpenRequestsPerWorker) { + signal = NettyClient.calculateResponse(alreadyDone, + shouldIgnoreCredit(request.getClientId()), getCurrentMaxCredit()); + } else { + signal = (short) alreadyDone.ordinal(); + } + buffer.writeShort(signal); ctx.write(buffer); } /** + * Get the maximum number of open requests per worker (credit) at the moment + * the method is called. This number should generally depend on the available + * memory and processing rate. + * + * @return maximum number of open requests for each worker + */ + protected abstract short getCurrentMaxCredit(); + + /** + * Whether we should ignore credit-based control flow in communicating with + * task with a given id. Generally, communication with master node does not + * require any control-flow mechanism. + * + * @param taskId id of the task on the other end of the communication + * @return 0 if credit should be ignored, 1 otherwise + */ + protected abstract boolean shouldIgnoreCredit(int taskId); + + /** * Set the flag indicating already closed first request */ private static void setAlreadyClosedFirstRequest() { http://git-wip-us.apache.org/repos/asf/giraph/blob/97e26e65/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java index f0fd1e5..54cb201 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java @@ -18,11 +18,10 @@ package org.apache.giraph.comm.netty.handler; +import org.apache.giraph.comm.netty.NettyClient; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; -import java.util.concurrent.ConcurrentMap; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -41,22 +40,16 @@ public class ResponseClientHandler extends ChannelInboundHandlerAdapter { private static volatile boolean ALREADY_DROPPED_FIRST_RESPONSE = false; /** Drop first response (used for simulating failure) */ private final boolean dropFirstResponse; - /** Outstanding worker request map */ - private final ConcurrentMap<ClientRequestId, - RequestInfo> workerIdOutstandingRequestMap; + /** Netty client that does the actual I/O and keeps track of open requests */ + private final NettyClient nettyClient; /** * Constructor. - * - * @param workerIdOutstandingRequestMap Map of worker ids to outstanding - * requests + * @param nettyClient Client that does the actual I/O * @param conf Configuration */ - public ResponseClientHandler( - ConcurrentMap<ClientRequestId, RequestInfo> - workerIdOutstandingRequestMap, - Configuration conf) { - this.workerIdOutstandingRequestMap = workerIdOutstandingRequestMap; + public ResponseClientHandler(NettyClient nettyClient, Configuration conf) { + this.nettyClient = nettyClient; dropFirstResponse = NETTY_SIMULATE_FIRST_RESPONSE_FAILED.get(conf); } @@ -64,61 +57,34 @@ public class ResponseClientHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof ByteBuf)) { - throw new IllegalStateException("messageReceived: Got a " + + throw new IllegalStateException("channelRead: Got a " + "non-ByteBuf message " + msg); } ByteBuf buf = (ByteBuf) msg; int senderId = -1; long requestId = -1; - int response = -1; + short response = -1; try { senderId = buf.readInt(); requestId = buf.readLong(); - response = buf.readByte(); + response = buf.readShort(); } catch (IndexOutOfBoundsException e) { throw new IllegalStateException( "channelRead: Got IndexOutOfBoundsException ", e); } ReferenceCountUtil.release(buf); - + boolean shouldDrop = false; // Simulate a failed response on the first response (if desired) if (dropFirstResponse && !ALREADY_DROPPED_FIRST_RESPONSE) { - LOG.info("messageReceived: Simulating dropped response " + response + + LOG.info("channelRead: Simulating dropped response " + response + " for request " + requestId); setAlreadyDroppedFirstResponse(); - synchronized (workerIdOutstandingRequestMap) { - workerIdOutstandingRequestMap.notifyAll(); - } - return; - } - - if (response == 1) { - LOG.info("messageReceived: Already completed request (taskId = " + - senderId + ", requestId = " + requestId + ")"); - } else if (response != 0) { - throw new IllegalStateException( - "messageReceived: Got illegal response " + response); - } - - RequestInfo requestInfo = workerIdOutstandingRequestMap.remove( - new ClientRequestId(senderId, requestId)); - if (requestInfo == null) { - LOG.info("messageReceived: Already received response for (taskId = " + - senderId + ", requestId = " + requestId + ")"); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("messageReceived: Completed (taskId = " + senderId + ")" + - requestInfo + ". Waiting on " + workerIdOutstandingRequestMap - .size() + " requests"); - } + shouldDrop = true; } - // Help NettyClient#waitSomeRequests() to finish faster - synchronized (workerIdOutstandingRequestMap) { - workerIdOutstandingRequestMap.notifyAll(); - } + nettyClient.messageReceived(senderId, requestId, response, shouldDrop); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/97e26e65/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java index 574e413..18e79ce 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java @@ -18,6 +18,7 @@ package org.apache.giraph.comm.netty.handler; +import org.apache.giraph.comm.netty.NettyWorkerClient; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.comm.ServerData; import org.apache.giraph.comm.requests.WorkerRequest; @@ -58,6 +59,17 @@ public class WorkerRequestServerHandler<I extends WritableComparable, } @Override + protected short getCurrentMaxCredit() { + return ((NettyWorkerClient) (serverData.getServiceWorker() + .getWorkerClient())).getMaxOpenRequestsPerWorker(); + } + + @Override + protected boolean shouldIgnoreCredit(int taskId) { + return taskId == serverData.getServiceWorker().getMasterInfo().getTaskId(); + } + + @Override public void processRequest(WorkerRequest<I, V, E> request) { request.doRequest(serverData); } http://git-wip-us.apache.org/repos/asf/giraph/blob/97e26e65/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java index 0462770..b782fe5 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java @@ -20,6 +20,7 @@ package org.apache.giraph.comm; import org.apache.giraph.comm.netty.NettyClient; import org.apache.giraph.comm.netty.NettyServer; +import org.apache.giraph.comm.netty.handler.AckSignalFlag; import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler; import org.apache.giraph.comm.requests.SendPartitionMutationsRequest; import org.apache.giraph.comm.requests.SendVertexRequest; @@ -312,4 +313,14 @@ public class RequestTest { } assertEquals(55, keySum); } + + @Test + + public void creditBasedResponseTest() throws IOException { + short response = NettyClient.calculateResponse(AckSignalFlag.NEW_REQUEST, + true, (short) 256); + assertEquals(NettyClient.getAckSignalFlag(response), AckSignalFlag.NEW_REQUEST); + assertEquals(NettyClient.shouldIgnoreCredit(response), true); + assertEquals(NettyClient.getCredit(response), (short) 256); + } } \ No newline at end of file
