[ASTERIXDB-2361][HYR] Memory Leak Due to Netty Close Listeners

- user model changes: no
- storage format changes: no
- interface changes:
  - add IServletResponse.notifyChannelInactive()

Change-Id: I40156538d62a3c06b9ccc14338c3f554921a12b8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2579
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhub...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/e20c7eea
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/e20c7eea
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/e20c7eea

Branch: refs/heads/master
Commit: e20c7eea498263f92267f4cbc39ad9372006ff6c
Parents: 2a773fe
Author: Till Westmann <ti...@apache.org>
Authored: Mon Apr 9 18:56:45 2018 -0700
Committer: Till Westmann <ti...@apache.org>
Committed: Mon Apr 9 21:06:31 2018 -0700

----------------------------------------------------------------------
 .../hyracks/http/api/IServletResponse.java      |  8 ++++--
 .../http/server/ChunkedNettyOutputStream.java   | 10 ++-----
 .../hyracks/http/server/ChunkedResponse.java    |  5 ++++
 .../hyracks/http/server/FullResponse.java       |  6 ++++
 .../hyracks/http/server/HttpRequestHandler.java |  4 +++
 .../hyracks/http/server/HttpServerHandler.java  | 30 ++++++++++++++------
 6 files changed, 45 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e20c7eea/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
index 1a7c65f..38f2d23 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
@@ -78,8 +78,12 @@ public interface IServletResponse extends Closeable {
     ChannelFuture lastContentFuture() throws IOException;
 
     /**
-     * Notifies the response that the channel has become writable
-     * became writable or unwritable. Used for flow control
+     * Notifies the response that the channel has become writable. Used for 
flow control
      */
     void notifyChannelWritable();
+
+    /**
+     * Notifies the response that the channel has become inactive.
+     */
+    void notifyChannelInactive();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e20c7eea/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
index d5f81e5..d4f1b3d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
@@ -43,12 +43,6 @@ public class ChunkedNettyOutputStream extends OutputStream {
         this.response = response;
         this.ctx = ctx;
         buffer = ctx.alloc().buffer(chunkSize);
-        // register listener for channel closed
-        ctx.channel().closeFuture().addListener(futureListener -> {
-            synchronized (ChunkedNettyOutputStream.this) {
-                ChunkedNettyOutputStream.this.notifyAll();
-            }
-        });
     }
 
     @Override
@@ -128,8 +122,8 @@ public class ChunkedNettyOutputStream extends OutputStream {
     private synchronized void ensureWritable() throws IOException {
         while (!ctx.channel().isWritable()) {
             try {
-                if (!ctx.channel().isOpen()) {
-                    throw new IOException("Closed channel");
+                if (!ctx.channel().isActive()) {
+                    throw new IOException("Inactive channel");
                 }
                 wait();
             } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e20c7eea/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index 323a463..5a43d25 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -187,4 +187,9 @@ public class ChunkedResponse implements IServletResponse {
     public void notifyChannelWritable() {
         outputStream.resume();
     }
+
+    @Override
+    public void notifyChannelInactive() {
+        outputStream.resume();
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e20c7eea/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
index 598048e..90e33b6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
@@ -105,4 +105,10 @@ public class FullResponse implements IServletResponse {
         // Do nothing.
         // This response is sent as a single piece
     }
+
+    @Override
+    public void notifyChannelInactive() {
+        // Do nothing.
+        // This response is sent as a single piece
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e20c7eea/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
index bf8e629..65a082c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
@@ -82,6 +82,10 @@ public class HttpRequestHandler implements Callable<Void> {
         response.notifyChannelWritable();
     }
 
+    public void notifyChannelInactive() {
+        response.notifyChannelInactive();
+    }
+
     public void reject() throws IOException {
         try {
             response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e20c7eea/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 2787b30..7b3d18a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -44,6 +44,9 @@ public class HttpServerHandler<T extends HttpServer> extends 
SimpleChannelInboun
     protected final T server;
     protected final int chunkSize;
     protected HttpRequestHandler handler;
+    protected IChannelClosedHandler closeHandler;
+    protected Future<Void> task;
+    protected IServlet servlet;
 
     public HttpServerHandler(T server, int chunkSize) {
         this.server = server;
@@ -64,10 +67,24 @@ public class HttpServerHandler<T extends HttpServer> 
extends SimpleChannelInboun
     }
 
     @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        if (handler != null) {
+            handler.notifyChannelInactive();
+        }
+        if (closeHandler != null) {
+            closeHandler.channelClosed(server, servlet, task);
+        }
+        super.channelInactive(ctx);
+    }
+
+    @Override
     protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
         FullHttpRequest request = (FullHttpRequest) msg;
+        handler = null;
+        task = null;
+        closeHandler = null;
         try {
-            IServlet servlet = server.getServlet(request);
+            servlet = server.getServlet(request);
             if (servlet == null) {
                 handleServletNotFound(ctx, request);
             } else {
@@ -94,16 +111,13 @@ public class HttpServerHandler<T extends HttpServer> 
extends SimpleChannelInboun
             return;
         }
         handler = new HttpRequestHandler(ctx, servlet, servletRequest, 
chunkSize);
-        submit(ctx, servlet);
+        submit(servlet);
     }
 
-    private void submit(ChannelHandlerContext ctx, IServlet servlet) throws 
IOException {
+    private void submit(IServlet servlet) throws IOException {
         try {
-            Future<Void> task = server.getExecutor(handler).submit(handler);
-            final IChannelClosedHandler closeHandler = 
servlet.getChannelClosedHandler(server);
-            if (closeHandler != null) {
-                ctx.channel().closeFuture().addListener(future -> 
closeHandler.channelClosed(server, servlet, task));
-            }
+            task = server.getExecutor(handler).submit(handler);
+            closeHandler = servlet.getChannelClosedHandler(server);
         } catch (RejectedExecutionException e) { // NOSONAR
             LOGGER.log(Level.WARN, "Request rejected by server executor 
service. " + e.getMessage());
             handler.reject();

Reply via email to