Repository: asterixdb Updated Branches: refs/heads/master 46b5449de -> 4cf1f7b97
[ASTERIXDB-2033][OTH] restrict chunk size in http server - user model changes: no - storage format changes: no - interface changes: no details: - This change ensures no large chunks are written to direct buffers in the http server. Change-Id: I08bac47ea28f66502b99df6fb8ff91dd85566d38 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1935 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/4cf1f7b9 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/4cf1f7b9 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/4cf1f7b9 Branch: refs/heads/master Commit: 4cf1f7b9775a85bd84c3d682277d16e056b81fb2 Parents: 46b5449 Author: Abdullah Alamoudi <bamou...@gmail.com> Authored: Fri Aug 11 18:17:37 2017 -0700 Committer: abdullah alamoudi <bamou...@gmail.com> Committed: Sat Aug 12 09:24:52 2017 -0700 ---------------------------------------------------------------------- hyracks-fullstack/hyracks/hyracks-http/pom.xml | 14 ++++- .../hyracks/http/server/AbstractServlet.java | 6 +++ .../http/server/ChunkedNettyOutputStream.java | 47 +++++++--------- .../hyracks/http/servlet/ChattyServlet.java | 56 ++++++++++++++++++++ .../hyracks/http/test/HttpServerTest.java | 46 +++++++++++++++- 5 files changed, 138 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4cf1f7b9/hyracks-fullstack/hyracks/hyracks-http/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-http/pom.xml b/hyracks-fullstack/hyracks/hyracks-http/pom.xml index 835cf61..6439adb 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-http/pom.xml @@ -16,7 +16,8 @@ ! specific language governing permissions and limitations ! under the License. !--> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.hyracks</groupId> @@ -27,6 +28,17 @@ <properties> <root.dir>${basedir}/../..</root.dir> </properties> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <argLine>-XX:MaxDirectMemorySize=16M</argLine> + </configuration> + </plugin> + </plugins> + </build> <dependencies> <dependency> <groupId>io.netty</groupId> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4cf1f7b9/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java index 4a4c25a..30d599d 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java @@ -88,6 +88,12 @@ public abstract class AbstractServlet implements IServlet { } catch (Exception e) { LOGGER.log(Level.WARNING, "Unhandled exception", e); response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); + } catch (Throwable th) { //NOSONAR Just logging and then throwing again + try { + LOGGER.log(Level.WARNING, "Unhandled throwable", th); + } catch (Throwable loggingFailure) {// NOSONAR... swallow logging failure + } + throw th; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4cf1f7b9/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java index 3456343..0066b77 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java @@ -27,6 +27,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.util.internal.OutOfDirectMemoryError; public class ChunkedNettyOutputStream extends OutputStream { @@ -50,23 +51,25 @@ public class ChunkedNettyOutputStream extends OutputStream { @Override public void write(byte[] b, int off, int len) throws IOException { - if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)) { - throw new IndexOutOfBoundsException(); - } else if (len == 0) { - return; - } - if (len > buffer.capacity()) { - flush(); - flush(b, off, len); - } else { - int space = buffer.writableBytes(); - if (space >= len) { - buffer.writeBytes(b, off, len); - } else { - buffer.writeBytes(b, off, space); - flush(); - buffer.writeBytes(b, off + space, len - space); + try { + if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)) { + throw new IndexOutOfBoundsException(); + } + while (len > 0) { + int space = buffer.writableBytes(); + if (space >= len) { + buffer.writeBytes(b, off, len); + len = 0; // NOSONAR + } else { + buffer.writeBytes(b, off, space); + len -= space; // NOSONAR + off += space; // NOSONAR + flush(); + } } + } catch (OutOfDirectMemoryError error) {// NOSONAR + response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); + throw error; } } @@ -114,18 +117,6 @@ public class ChunkedNettyOutputStream extends OutputStream { } } - private void flush(byte[] buf, int offset, int len) throws IOException { - ensureWritable(); - ByteBuf aBuffer = ctx.alloc().buffer(len); - aBuffer.writeBytes(buf, offset, len); - if (response.status() == HttpResponseStatus.OK) { - response.beforeFlush(); - ctx.write(new DefaultHttpContent(aBuffer), ctx.channel().voidPromise()); - } else { - response.error(aBuffer); - } - } - private synchronized void ensureWritable() throws IOException { while (!ctx.channel().isWritable()) { try { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4cf1f7b9/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java new file mode 100644 index 0000000..30df003 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.http.servlet; + +import java.util.concurrent.ConcurrentMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; +import org.apache.hyracks.http.server.AbstractServlet; +import org.apache.hyracks.http.server.utils.HttpUtil; + +import io.netty.handler.codec.http.HttpResponseStatus; + +public class ChattyServlet extends AbstractServlet { + private static final Logger LOGGER = Logger.getLogger(ChattyServlet.class.getName()); + private byte[] bytes; + + public ChattyServlet(ConcurrentMap<String, Object> ctx, String[] paths) { + super(ctx, paths); + String line = "I don't know when to stop talking\n"; + StringBuilder responseBuilder = new StringBuilder(); + for (int i = 0; i < 100000; i++) { + responseBuilder.append(line); + } + String responseString = responseBuilder.toString(); + bytes = responseString.getBytes(); + } + + @Override + protected void get(IServletRequest request, IServletResponse response) throws Exception { + response.setStatus(HttpResponseStatus.OK); + HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_HTML, HttpUtil.Encoding.UTF8); + LOGGER.log(Level.WARNING, "I am about to flood you... and a single buffer is " + bytes.length + " bytes"); + for (int i = 0; i < 100; i++) { + response.outputStream().write(bytes); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4cf1f7b9/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java index 2076201..854980e 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java @@ -41,13 +41,16 @@ import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.client.StandardHttpRequestRetryHandler; import org.apache.hyracks.http.server.HttpServer; import org.apache.hyracks.http.server.WebManager; +import org.apache.hyracks.http.servlet.ChattyServlet; import org.apache.hyracks.http.servlet.SlowServlet; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import io.netty.handler.codec.http.HttpResponseStatus; public class HttpServerTest { + static final boolean PRINT_TO_CONSOLE = false; static final int PORT = 9898; static final int NUM_EXECUTOR_THREADS = 16; static final int SERVER_QUEUE_SIZE = 16; @@ -60,6 +63,13 @@ public class HttpServerTest { static final AtomicInteger OTHER_COUNT = new AtomicInteger(); static final List<Thread> THREADS = new ArrayList<>(); + @Before + public void setUp() { + SUCCESS_COUNT.set(0); + UNAVAILABLE_COUNT.set(0); + OTHER_COUNT.set(0); + } + @Test public void testOverloadingServer() throws Exception { WebManager webMgr = new WebManager(); @@ -83,6 +93,31 @@ public class HttpServerTest { } @Test + public void testChattyServer() throws Exception { + int numRequests = 64; + int numExecutors = 32; + int serverQueueSize = 32; + WebManager webMgr = new WebManager(); + HttpServer server = + new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize); + ChattyServlet servlet = new ChattyServlet(server.ctx(), new String[] { PATH }); + server.addServlet(servlet); + webMgr.add(server); + webMgr.start(); + try { + request(numRequests); + for (Thread thread : THREADS) { + thread.join(); + } + Assert.assertEquals(numRequests, SUCCESS_COUNT.get()); + Assert.assertEquals(0, UNAVAILABLE_COUNT.get()); + Assert.assertEquals(0, OTHER_COUNT.get()); + } finally { + webMgr.stop(); + } + } + + @Test public void testMalformedString() throws Exception { WebManager webMgr = new WebManager(); HttpServer server = @@ -136,8 +171,15 @@ public class HttpServerTest { } else { OTHER_COUNT.incrementAndGet(); } - InputStream responseStream = response.getEntity().getContent(); - IOUtils.closeQuietly(responseStream); + InputStream in = response.getEntity().getContent(); + if (PRINT_TO_CONSOLE) { + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + String line = null; + while ((line = reader.readLine()) != null) { + System.out.println(line); + } + } + IOUtils.closeQuietly(in); } catch (Throwable th) { th.printStackTrace(); }