This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 677f1ae6975d5ddd9fc509cc689fa18281fd6091 Author: Michael Blow <[email protected]> AuthorDate: Thu Jun 29 18:34:59 2023 -0400 [NO ISSUE][HYR][HTTP] Prevent Netty buffer leaks on close() errors Change-Id: Ibe5836eb61b3f01b8d95820f20c55269e98a3118 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17624 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Reviewed-by: Murtadha Al Hubail <[email protected]> Tested-by: Jenkins <[email protected]> --- .../org/apache/hyracks/api/util/InvokeUtil.java | 40 ++++++++++++++- .../hyracks/http/server/ChunkedResponse.java | 57 ++++++++++++++-------- .../org/apache/hyracks/util/IOThrowingAction.java | 33 +++++++++++++ 3 files changed, 108 insertions(+), 22 deletions(-) diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java index 7d04cf2254..57f9750459 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java @@ -29,6 +29,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.util.ComputingAction; import org.apache.hyracks.util.IDelay; import org.apache.hyracks.util.IOInterruptibleAction; +import org.apache.hyracks.util.IOThrowingAction; import org.apache.hyracks.util.IRetryPolicy; import org.apache.hyracks.util.InterruptibleAction; import org.apache.hyracks.util.Span; @@ -188,7 +189,7 @@ public class InvokeUtil { } } - @SuppressWarnings({ "squid:S1181", "squid:S1193" }) // catching Throwable, instanceof of exception + @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs public static void tryWithCleanups(ThrowingAction action, ThrowingAction... cleanups) throws Exception { Throwable savedT = null; boolean suppressedInterrupted = false; @@ -225,6 +226,43 @@ public class InvokeUtil { } } + @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs + public static void tryIoWithCleanups(IOThrowingAction action, IOThrowingAction... cleanups) throws IOException { + Throwable savedT = null; + boolean suppressedInterrupted = false; + try { + action.run(); + } catch (Throwable t) { + savedT = t; + } finally { + for (IOThrowingAction cleanup : cleanups) { + try { + cleanup.run(); + } catch (Throwable t) { + if (savedT != null) { + savedT.addSuppressed(t); + suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException; + } else { + savedT = t; + } + } + } + } + if (savedT == null) { + return; + } + if (suppressedInterrupted) { + Thread.currentThread().interrupt(); + } + if (savedT instanceof Error) { + throw (Error) savedT; + } else if (savedT instanceof IOException) { + throw (IOException) savedT; + } else { + throw HyracksDataException.create(savedT); + } + } + /** * Runs the supplied action, after suspending any pending interruption. An error will be logged if * the action is itself interrupted. diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java index e00c51903e..740af2fe34 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java @@ -25,6 +25,8 @@ import java.io.PrintWriter; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.InvokeUtil; import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.utils.HttpUtil; import org.apache.logging.log4j.Level; @@ -45,6 +47,7 @@ import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.ReferenceCountUtil; /** * A chunked http response. Here is how it is expected to work: @@ -114,28 +117,40 @@ public class ChunkedResponse implements IServletResponse { @Override public void close() throws IOException { - if (writer != null) { - writer.close(); - } else { - outputStream.close(); - } - if (errorBuf == null && response.status() == HttpResponseStatus.OK) { - if (!done) { - respond(LastHttpContent.EMPTY_LAST_CONTENT); - } - } else { - // There was an error - if (headerSent) { - LOGGER.log(Level.WARN, "Error after header write of chunked response"); - if (errorBuf != null) { - errorBuf.release(); + try { + InvokeUtil.tryIoWithCleanups(() -> { + if (writer != null) { + writer.close(); + } else { + outputStream.close(); + } + if (errorBuf == null && response.status() == HttpResponseStatus.OK) { + if (!done) { + respond(LastHttpContent.EMPTY_LAST_CONTENT); + } + } else { + // There was an error + if (headerSent) { + LOGGER.log(Level.WARN, "Error after header write of chunked response"); + future = ctx.channel().close().addListener(handler); + } else { + // we didn't send anything to the user, we need to send an non-chunked error response + fullResponse(response.protocolVersion(), response.status(), + errorBuf == null ? ctx.alloc().buffer(0, 0) : errorBuf, response.headers()); + // The responsibility of releasing the error buffer is now with the netty pipeline since it is + // forwarded within the http content. We must nullify buffer to avoid releasing the buffer twice. + errorBuf = null; + } } - future = ctx.channel().close().addListener(handler); - } else { - // we didn't send anything to the user, we need to send an non-chunked error response - fullResponse(response.protocolVersion(), response.status(), - errorBuf == null ? ctx.alloc().buffer(0, 0) : errorBuf, response.headers()); - } + }, outputStream::close, () -> { + ReferenceCountUtil.release(errorBuf); + // We must nullify buffer to avoid releasing the buffer twice in case of duplicate close() + errorBuf = null; + }); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw HyracksDataException.create(e); } done = true; } diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOThrowingAction.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOThrowingAction.java new file mode 100644 index 0000000000..b9430d84dd --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOThrowingAction.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.util; + +import java.io.IOException; + +@FunctionalInterface +public interface IOThrowingAction { + void run() throws IOException; // NOSONAR + + static ComputingAction<Void> asComputingAction(IOThrowingAction action) { + return () -> { + action.run(); + return null; + }; + } +}
