Repository: giraph
Updated Branches:
  refs/heads/trunk e5f851aac -> b5284cd9b


Setting auto-read in Netty to false

Summary: By default, auto-read flag is set to true in Netty. This means Netty 
proactively read requests as they become available to a worker. However, this 
behavior sometime causes the off-heap memory to increase continuously. This 
happens specifically in presence of a spike in the amount of received requests. 
In that situation, the processing/handling rate of incoming requests may be 
less than the request receipt rate leading to high-memory kill (CGroup kill or 
OOM). With auto-read flag set to false, we read and process requests one by one 
and (hopefully/presumably) letting the transport layer do the flow control 
(i.e. dropping packets or reducing congestion window of TCP).

Test Plan:
mvn clean verify
PageRank-like application at large scale fails with auto-read set to true, and 
successfully runs with auto-read set to false.
**DO NOT ACCEPT THIS DIFF.** We should do more testing and prove it is reliable.

Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo

Reviewed By: maja.kabiljo

Differential Revision: https://reviews.facebook.net/D57213


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b5284cd9
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b5284cd9
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b5284cd9

Branch: refs/heads/trunk
Commit: b5284cd9b0d66523f6aa0a57576beace90c86e55
Parents: e5f851a
Author: Hassan Eslami <[email protected]>
Authored: Tue Apr 26 10:51:13 2016 -0700
Committer: Maja Kabiljo <[email protected]>
Committed: Tue Apr 26 10:51:13 2016 -0700

----------------------------------------------------------------------
 .../netty/handler/RequestServerHandler.java     | 26 ++++++++++++++++++++
 .../org/apache/giraph/conf/GiraphConstants.java |  9 +++++++
 2 files changed, 35 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/b5284cd9/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
index ab96714..1e76f2e 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
@@ -20,6 +20,7 @@ package org.apache.giraph.comm.netty.handler;
 
 import org.apache.giraph.comm.netty.NettyClient;
 import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.TaskInfo;
 import org.apache.giraph.time.SystemTime;
@@ -31,6 +32,8 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static 
org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED;
 
 /**
@@ -61,6 +64,10 @@ public abstract class RequestServerHandler<R> extends
   private final Thread.UncaughtExceptionHandler exceptionHandler;
   /** Do we have a limit on the number of open requests per worker */
   private final boolean limitOpenRequestsPerWorker;
+  /** Whether it is the first time reading/handling a request*/
+  private final AtomicBoolean firstRead = new AtomicBoolean(true);
+  /** Cached value for NETTY_AUTO_READ configuration option */
+  private final boolean nettyAutoRead;
 
   /**
    * Constructor
@@ -81,6 +88,7 @@ public abstract class RequestServerHandler<R> extends
     this.exceptionHandler = exceptionHandler;
     this.limitOpenRequestsPerWorker =
         NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
+    this.nettyAutoRead = GiraphConstants.NETTY_AUTO_READ.get(conf);
   }
 
   @Override
@@ -141,6 +149,24 @@ public abstract class RequestServerHandler<R> extends
     buffer.writeShort(signal);
 
     ctx.write(buffer);
+    // NettyServer is bootstrapped with auto-read set to true by default. After
+    // the first request is processed, we set auto-read to false. This prevents
+    // netty from reading requests continuously and putting them in off-heap
+    // memory. Instead, we will call `read` on requests one by one, so that the
+    // lower level transport layer handles the congestion if the rate of
+    // incoming requests is more than the available processing capability.
+    if (!nettyAutoRead && firstRead.compareAndSet(true, false)) {
+      ctx.channel().config().setAutoRead(false);
+    }
+  }
+
+  @Override
+  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+    if (!nettyAutoRead) {
+      ctx.read();
+    } else {
+      super.channelReadComplete(ctx);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/b5284cd9/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 1e51101..8335e7e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -638,6 +638,15 @@ public interface GiraphConstants {
       new StrConfOption("giraph.nettyCompressionAlgorithm", "",
           "Which compression algorithm to use in netty");
 
+  /**
+   * Whether netty should pro-actively read requests and feed them to its
+   * processing pipeline
+   */
+  BooleanConfOption NETTY_AUTO_READ =
+      new BooleanConfOption("giraph.nettyAutoRead", true,
+          "Whether netty should pro-actively read requests and feed them to " +
+              "its processing pipeline");
+
   /** Max resolve address attempts */
   IntConfOption MAX_RESOLVE_ADDRESS_ATTEMPTS =
       new IntConfOption("giraph.maxResolveAddressAttempts", 5,

Reply via email to