Repository: giraph Updated Branches: refs/heads/trunk e5f851aac -> b5284cd9b
Setting auto-read in Netty to false Summary: By default, auto-read flag is set to true in Netty. This means Netty proactively read requests as they become available to a worker. However, this behavior sometime causes the off-heap memory to increase continuously. This happens specifically in presence of a spike in the amount of received requests. In that situation, the processing/handling rate of incoming requests may be less than the request receipt rate leading to high-memory kill (CGroup kill or OOM). With auto-read flag set to false, we read and process requests one by one and (hopefully/presumably) letting the transport layer do the flow control (i.e. dropping packets or reducing congestion window of TCP). Test Plan: mvn clean verify PageRank-like application at large scale fails with auto-read set to true, and successfully runs with auto-read set to false. **DO NOT ACCEPT THIS DIFF.** We should do more testing and prove it is reliable. Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo Reviewed By: maja.kabiljo Differential Revision: https://reviews.facebook.net/D57213 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b5284cd9 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b5284cd9 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b5284cd9 Branch: refs/heads/trunk Commit: b5284cd9b0d66523f6aa0a57576beace90c86e55 Parents: e5f851a Author: Hassan Eslami <[email protected]> Authored: Tue Apr 26 10:51:13 2016 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Tue Apr 26 10:51:13 2016 -0700 ---------------------------------------------------------------------- .../netty/handler/RequestServerHandler.java | 26 ++++++++++++++++++++ .../org/apache/giraph/conf/GiraphConstants.java | 9 +++++++ 2 files changed, 35 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/b5284cd9/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java index ab96714..1e76f2e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java @@ -20,6 +20,7 @@ package org.apache.giraph.comm.netty.handler; import org.apache.giraph.comm.netty.NettyClient; import org.apache.giraph.comm.requests.WritableRequest; +import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.TaskInfo; import org.apache.giraph.time.SystemTime; @@ -31,6 +32,8 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import java.util.concurrent.atomic.AtomicBoolean; + import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED; /** @@ -61,6 +64,10 @@ public abstract class RequestServerHandler<R> extends private final Thread.UncaughtExceptionHandler exceptionHandler; /** Do we have a limit on the number of open requests per worker */ private final boolean limitOpenRequestsPerWorker; + /** Whether it is the first time reading/handling a request*/ + private final AtomicBoolean firstRead = new AtomicBoolean(true); + /** Cached value for NETTY_AUTO_READ configuration option */ + private final boolean nettyAutoRead; /** * Constructor @@ -81,6 +88,7 @@ public abstract class RequestServerHandler<R> extends this.exceptionHandler = exceptionHandler; this.limitOpenRequestsPerWorker = NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf); + this.nettyAutoRead = GiraphConstants.NETTY_AUTO_READ.get(conf); } @Override @@ -141,6 +149,24 @@ public abstract class RequestServerHandler<R> extends buffer.writeShort(signal); ctx.write(buffer); + // NettyServer is bootstrapped with auto-read set to true by default. After + // the first request is processed, we set auto-read to false. This prevents + // netty from reading requests continuously and putting them in off-heap + // memory. Instead, we will call `read` on requests one by one, so that the + // lower level transport layer handles the congestion if the rate of + // incoming requests is more than the available processing capability. + if (!nettyAutoRead && firstRead.compareAndSet(true, false)) { + ctx.channel().config().setAutoRead(false); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + if (!nettyAutoRead) { + ctx.read(); + } else { + super.channelReadComplete(ctx); + } } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/b5284cd9/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 1e51101..8335e7e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -638,6 +638,15 @@ public interface GiraphConstants { new StrConfOption("giraph.nettyCompressionAlgorithm", "", "Which compression algorithm to use in netty"); + /** + * Whether netty should pro-actively read requests and feed them to its + * processing pipeline + */ + BooleanConfOption NETTY_AUTO_READ = + new BooleanConfOption("giraph.nettyAutoRead", true, + "Whether netty should pro-actively read requests and feed them to " + + "its processing pipeline"); + /** Max resolve address attempts */ IntConfOption MAX_RESOLVE_ADDRESS_ATTEMPTS = new IntConfOption("giraph.maxResolveAddressAttempts", 5,
