This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 677f1ae6975d5ddd9fc509cc689fa18281fd6091
Author: Michael Blow <[email protected]>
AuthorDate: Thu Jun 29 18:34:59 2023 -0400

    [NO ISSUE][HYR][HTTP] Prevent Netty buffer leaks on close() errors
    
    Change-Id: Ibe5836eb61b3f01b8d95820f20c55269e98a3118
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17624
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Michael Blow <[email protected]>
    Reviewed-by: Murtadha Al Hubail <[email protected]>
    Tested-by: Jenkins <[email protected]>
---
 .../org/apache/hyracks/api/util/InvokeUtil.java    | 40 ++++++++++++++-
 .../hyracks/http/server/ChunkedResponse.java       | 57 ++++++++++++++--------
 .../org/apache/hyracks/util/IOThrowingAction.java  | 33 +++++++++++++
 3 files changed, 108 insertions(+), 22 deletions(-)

diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index 7d04cf2254..57f9750459 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -29,6 +29,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.util.ComputingAction;
 import org.apache.hyracks.util.IDelay;
 import org.apache.hyracks.util.IOInterruptibleAction;
+import org.apache.hyracks.util.IOThrowingAction;
 import org.apache.hyracks.util.IRetryPolicy;
 import org.apache.hyracks.util.InterruptibleAction;
 import org.apache.hyracks.util.Span;
@@ -188,7 +189,7 @@ public class InvokeUtil {
         }
     }
 
-    @SuppressWarnings({ "squid:S1181", "squid:S1193" }) // catching Throwable, 
instanceof of exception
+    @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) 
// catching Throwable, instanceofs
     public static void tryWithCleanups(ThrowingAction action, 
ThrowingAction... cleanups) throws Exception {
         Throwable savedT = null;
         boolean suppressedInterrupted = false;
@@ -225,6 +226,43 @@ public class InvokeUtil {
         }
     }
 
+    @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) 
// catching Throwable, instanceofs
+    public static void tryIoWithCleanups(IOThrowingAction action, 
IOThrowingAction... cleanups) throws IOException {
+        Throwable savedT = null;
+        boolean suppressedInterrupted = false;
+        try {
+            action.run();
+        } catch (Throwable t) {
+            savedT = t;
+        } finally {
+            for (IOThrowingAction cleanup : cleanups) {
+                try {
+                    cleanup.run();
+                } catch (Throwable t) {
+                    if (savedT != null) {
+                        savedT.addSuppressed(t);
+                        suppressedInterrupted = suppressedInterrupted || t 
instanceof InterruptedException;
+                    } else {
+                        savedT = t;
+                    }
+                }
+            }
+        }
+        if (savedT == null) {
+            return;
+        }
+        if (suppressedInterrupted) {
+            Thread.currentThread().interrupt();
+        }
+        if (savedT instanceof Error) {
+            throw (Error) savedT;
+        } else if (savedT instanceof IOException) {
+            throw (IOException) savedT;
+        } else {
+            throw HyracksDataException.create(savedT);
+        }
+    }
+
     /**
      * Runs the supplied action, after suspending any pending interruption. An 
error will be logged if
      * the action is itself interrupted.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index e00c51903e..740af2fe34 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -25,6 +25,8 @@ import java.io.PrintWriter;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.http.api.IServletResponse;
 import org.apache.hyracks.http.server.utils.HttpUtil;
 import org.apache.logging.log4j.Level;
@@ -45,6 +47,7 @@ import io.netty.handler.codec.http.HttpObject;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
 import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.util.ReferenceCountUtil;
 
 /**
  * A chunked http response. Here is how it is expected to work:
@@ -114,28 +117,40 @@ public class ChunkedResponse implements IServletResponse {
 
     @Override
     public void close() throws IOException {
-        if (writer != null) {
-            writer.close();
-        } else {
-            outputStream.close();
-        }
-        if (errorBuf == null && response.status() == HttpResponseStatus.OK) {
-            if (!done) {
-                respond(LastHttpContent.EMPTY_LAST_CONTENT);
-            }
-        } else {
-            // There was an error
-            if (headerSent) {
-                LOGGER.log(Level.WARN, "Error after header write of chunked 
response");
-                if (errorBuf != null) {
-                    errorBuf.release();
+        try {
+            InvokeUtil.tryIoWithCleanups(() -> {
+                if (writer != null) {
+                    writer.close();
+                } else {
+                    outputStream.close();
+                }
+                if (errorBuf == null && response.status() == 
HttpResponseStatus.OK) {
+                    if (!done) {
+                        respond(LastHttpContent.EMPTY_LAST_CONTENT);
+                    }
+                } else {
+                    // There was an error
+                    if (headerSent) {
+                        LOGGER.log(Level.WARN, "Error after header write of 
chunked response");
+                        future = ctx.channel().close().addListener(handler);
+                    } else {
+                        // we didn't send anything to the user, we need to 
send an non-chunked error response
+                        fullResponse(response.protocolVersion(), 
response.status(),
+                                errorBuf == null ? ctx.alloc().buffer(0, 0) : 
errorBuf, response.headers());
+                        // The responsibility of releasing the error buffer is 
now with the netty pipeline since it is
+                        // forwarded within the http content. We must nullify 
buffer to avoid releasing the buffer twice.
+                        errorBuf = null;
+                    }
                 }
-                future = ctx.channel().close().addListener(handler);
-            } else {
-                // we didn't send anything to the user, we need to send an 
non-chunked error response
-                fullResponse(response.protocolVersion(), response.status(),
-                        errorBuf == null ? ctx.alloc().buffer(0, 0) : 
errorBuf, response.headers());
-            }
+            }, outputStream::close, () -> {
+                ReferenceCountUtil.release(errorBuf);
+                // We must nullify buffer to avoid releasing the buffer twice 
in case of duplicate close()
+                errorBuf = null;
+            });
+        } catch (IOException e) {
+            throw e;
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
         }
         done = true;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOThrowingAction.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOThrowingAction.java
new file mode 100644
index 0000000000..b9430d84dd
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOThrowingAction.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.io.IOException;
+
+@FunctionalInterface
+public interface IOThrowingAction {
+    void run() throws IOException; // NOSONAR
+
+    static ComputingAction<Void> asComputingAction(IOThrowingAction action) {
+        return () -> {
+            action.run();
+            return null;
+        };
+    }
+}

Reply via email to