Repository: asterixdb Updated Branches: refs/heads/master 5aeba9b47 -> 1a879cdf0
[ASTERIXDB-2475][OTH] Reject HTTP Pipelined Requests - user model changes: no - storage format changes: no - interface changes: no Details: - If a client sends multiple requests on the same connection before reading the response of each request (i.e. pipelined requests), the request will be rejected and the connection will be closed. - Add test case. - Fix typo in method name. Change-Id: I67c370d4d37a3e267b30e13333714605b07b7515 Reviewed-on: https://asterix-gerrit.ics.uci.edu/3021 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Reviewed-by: Ian Maxon <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/1a879cdf Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/1a879cdf Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/1a879cdf Branch: refs/heads/master Commit: 1a879cdf0deb16a60f9018c292f1da10bc760bdb Parents: 5aeba9b Author: Murtadha Hubail <[email protected]> Authored: Tue Nov 6 17:39:34 2018 +0300 Committer: Murtadha Hubail <[email protected]> Committed: Thu Nov 8 21:11:00 2018 -0800 ---------------------------------------------------------------------- hyracks-fullstack/hyracks/hyracks-http/pom.xml | 6 + .../http/server/ChunkedNettyOutputStream.java | 2 +- .../hyracks/http/server/ChunkedResponse.java | 21 ++- .../hyracks/http/server/FullResponse.java | 9 +- .../hyracks/http/server/HttpRequestHandler.java | 9 +- .../hyracks/http/server/HttpServerHandler.java | 81 ++++++++--- .../hyracks/http/PipelinedRequestsTest.java | 140 +++++++++++++++++++ 7 files changed, 238 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a879cdf/hyracks-fullstack/hyracks/hyracks-http/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-http/pom.xml b/hyracks-fullstack/hyracks/hyracks-http/pom.xml index 7b1dc63..5d56c5e 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-http/pom.xml @@ -62,6 +62,12 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore-nio</artifactId> + <version>4.4.10</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a879cdf/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java index adea133..4f7133a 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java @@ -90,7 +90,7 @@ public class ChunkedNettyOutputStream extends OutputStream { } } } else { - response.fullReponse(buffer); + response.fullResponse(buffer); } super.close(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a879cdf/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java index b3a7587..0aadb1e 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java @@ -31,12 +31,14 @@ import org.apache.logging.log4j.Logger; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; @@ -63,13 +65,16 @@ public class ChunkedResponse implements IServletResponse { private final ChannelHandlerContext ctx; private final ChunkedNettyOutputStream outputStream; private final PrintWriter writer; + private final HttpServerHandler<?> handler; private DefaultHttpResponse response; private boolean headerSent; private ByteBuf error; private ChannelFuture future; private boolean done; - public ChunkedResponse(ChannelHandlerContext ctx, FullHttpRequest request, int chunkSize) { + public ChunkedResponse(HttpServerHandler<?> handler, ChannelHandlerContext ctx, FullHttpRequest request, + int chunkSize) { + this.handler = handler; this.ctx = ctx; outputStream = new ChunkedNettyOutputStream(ctx, chunkSize, this); writer = new PrintWriter(outputStream); @@ -102,7 +107,7 @@ public class ChunkedResponse implements IServletResponse { writer.close(); if (error == null && response.status() == HttpResponseStatus.OK) { if (!done) { - future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + respond(LastHttpContent.EMPTY_LAST_CONTENT); } } else { // There was an error @@ -111,7 +116,7 @@ public class ChunkedResponse implements IServletResponse { if (error != null) { error.release(); } - future = ctx.channel().close(); + future = ctx.channel().close().addListener(handler); } else { // we didn't send anything to the user, we need to send an non-chunked error response fullResponse(response.protocolVersion(), response.status(), @@ -155,7 +160,7 @@ public class ChunkedResponse implements IServletResponse { return headerSent; } - public void fullReponse(ByteBuf buffer) { + public void fullResponse(ByteBuf buffer) { fullResponse(response.protocolVersion(), response.status(), buffer, response.headers()); } @@ -165,7 +170,7 @@ public class ChunkedResponse implements IServletResponse { // for a full response remove chunked transfer-encoding and set the content length instead fullResponse.headers().remove(HttpHeaderNames.TRANSFER_ENCODING); fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, buffer.readableBytes()); - future = ctx.writeAndFlush(fullResponse); + respond(fullResponse); headerSent = true; done = true; } @@ -184,4 +189,10 @@ public class ChunkedResponse implements IServletResponse { public void cancel() { outputStream.cancel(); } + + private void respond(HttpObject response) { + final ChannelPromise responseCompletionPromise = ctx.newPromise(); + responseCompletionPromise.addListener(handler); + future = ctx.writeAndFlush(response, responseCompletionPromise); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a879cdf/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java index 55bbd30..2b1e8b4 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java @@ -29,6 +29,7 @@ import org.apache.hyracks.http.server.utils.HttpUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; @@ -41,9 +42,11 @@ public class FullResponse implements IServletResponse { private final ByteArrayOutputStream baos; private final PrintWriter writer; private final DefaultFullHttpResponse response; + private final HttpServerHandler<?> handler; private ChannelFuture future; - public FullResponse(ChannelHandlerContext ctx, FullHttpRequest request) { + public FullResponse(HttpServerHandler<?> handler, ChannelHandlerContext ctx, FullHttpRequest request) { + this.handler = handler; this.ctx = ctx; baos = new ByteArrayOutputStream(); writer = new PrintWriter(baos); @@ -56,7 +59,9 @@ public class FullResponse implements IServletResponse { writer.close(); FullHttpResponse fullResponse = response.replace(Unpooled.copiedBuffer(baos.toByteArray())); fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, fullResponse.content().readableBytes()); - future = ctx.writeAndFlush(fullResponse); + final ChannelPromise responseCompletionPromise = ctx.newPromise(); + responseCompletionPromise.addListener(handler); + future = ctx.writeAndFlush(fullResponse, responseCompletionPromise); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a879cdf/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java index 1c0801c..652be7c 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java @@ -40,15 +40,18 @@ public class HttpRequestHandler implements Callable<Void> { private final IServlet servlet; private final IServletRequest request; private final IServletResponse response; + private final HttpServerHandler<?> handler; private boolean started = false; private boolean cancelled = false; - public HttpRequestHandler(ChannelHandlerContext ctx, IServlet servlet, IServletRequest request, int chunkSize) { + public HttpRequestHandler(HttpServerHandler<?> handler, ChannelHandlerContext ctx, IServlet servlet, + IServletRequest request, int chunkSize) { + this.handler = handler; this.ctx = ctx; this.servlet = servlet; this.request = request; - response = chunkSize == 0 ? new FullResponse(ctx, request.getHttpRequest()) - : new ChunkedResponse(ctx, request.getHttpRequest(), chunkSize); + response = chunkSize == 0 ? new FullResponse(handler, ctx, request.getHttpRequest()) + : new ChunkedResponse(handler, ctx, request.getHttpRequest(), chunkSize); request.getHttpRequest().retain(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a879cdf/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java index 36d79f3..e3a0d4b 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java @@ -33,23 +33,28 @@ import org.apache.logging.log4j.Logger; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; -public class HttpServerHandler<T extends HttpServer> extends SimpleChannelInboundHandler<Object> { +public class HttpServerHandler<T extends HttpServer> extends SimpleChannelInboundHandler<Object> + implements ChannelFutureListener { private static final Logger LOGGER = LogManager.getLogger(); + private static final String PIPELINED_REQUEST_ERROR_MSG = "Server doesn't support pipelined requests"; protected final T server; - protected final int chunkSize; - protected HttpRequestHandler handler; - protected IChannelClosedHandler closeHandler; - protected Future<Void> task; - protected IServlet servlet; + protected volatile HttpRequestHandler handler; + protected volatile Future<Void> task; + protected volatile IServlet servlet; + private volatile IChannelClosedHandler closeHandler; + private volatile boolean pipelinedRequest = false; + private final int chunkSize; public HttpServerHandler(T server, int chunkSize) { this.server = server; @@ -63,19 +68,24 @@ public class HttpServerHandler<T extends HttpServer> extends SimpleChannelInboun @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - if (ctx.channel().isWritable()) { - handler.notifyChannelWritable(); + final HttpRequestHandler currentHandler = handler; + if (currentHandler != null && ctx.channel().isWritable()) { + currentHandler.notifyChannelWritable(); } super.channelWritabilityChanged(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - if (handler != null) { - handler.notifyChannelInactive(); + final HttpRequestHandler currentHandler = handler; + if (currentHandler != null) { + currentHandler.notifyChannelInactive(); } - if (closeHandler != null) { - closeHandler.channelClosed(server, servlet, task); + final IChannelClosedHandler currentCloseHandler = closeHandler; + final IServlet currentServlet = servlet; + final Future<Void> currentTask = task; + if (currentCloseHandler != null && currentServlet != null && currentTask != null) { + currentCloseHandler.channelClosed(server, currentServlet, currentTask); } super.channelInactive(ctx); } @@ -83,9 +93,15 @@ public class HttpServerHandler<T extends HttpServer> extends SimpleChannelInboun @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) { FullHttpRequest request = (FullHttpRequest) msg; - handler = null; - task = null; - closeHandler = null; + if (isPipelinedRequest()) { + pipelinedRequest = true; + rejectPipelinedRequestAndClose(ctx, request); + return; + } + if (request.decoderResult().isFailure()) { + respond(ctx, request, HttpResponseStatus.BAD_REQUEST); + return; + } try { servlet = server.getServlet(request); if (servlet == null) { @@ -94,7 +110,7 @@ public class HttpServerHandler<T extends HttpServer> extends SimpleChannelInboun submit(ctx, servlet, request); } } catch (Exception e) { - LOGGER.log(Level.WARN, "Failure Submitting HTTP Request", e); + LOGGER.log(Level.WARN, "Failure handling HTTP request", e); respond(ctx, request, HttpResponseStatus.INTERNAL_SERVER_ERROR); } } @@ -103,7 +119,9 @@ public class HttpServerHandler<T extends HttpServer> extends SimpleChannelInboun final DefaultHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(), status); response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, 0); HttpUtil.setConnectionHeader(request, response); - final ChannelFuture clientChannel = ctx.writeAndFlush(response); + final ChannelPromise responseCompletionPromise = ctx.newPromise(); + responseCompletionPromise.addListener(this); + final ChannelFuture clientChannel = ctx.writeAndFlush(response, responseCompletionPromise); if (!io.netty.handler.codec.http.HttpUtil.isKeepAlive(request)) { clientChannel.addListener(ChannelFutureListener.CLOSE); } @@ -118,7 +136,7 @@ public class HttpServerHandler<T extends HttpServer> extends SimpleChannelInboun respond(ctx, request, HttpResponseStatus.BAD_REQUEST); return; } - handler = new HttpRequestHandler(ctx, servlet, servletRequest, chunkSize); + handler = new HttpRequestHandler(this, ctx, servlet, servletRequest, chunkSize); submit(servlet); } @@ -144,4 +162,29 @@ public class HttpServerHandler<T extends HttpServer> extends SimpleChannelInboun LOGGER.log(Level.WARN, "Failure handling HTTP Request", cause); ctx.close(); } -} + + @Override + public void operationComplete(ChannelFuture future) { + if (!pipelinedRequest) { + requestHandled(); + } + } + + private boolean isPipelinedRequest() { + return handler != null || servlet != null || closeHandler != null || task != null; + } + + private void rejectPipelinedRequestAndClose(ChannelHandlerContext ctx, FullHttpRequest request) { + LOGGER.warn(PIPELINED_REQUEST_ERROR_MSG); + request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); + respond(ctx, request, + new HttpResponseStatus(HttpResponseStatus.BAD_REQUEST.code(), PIPELINED_REQUEST_ERROR_MSG)); + } + + private void requestHandled() { + handler = null; + servlet = null; + task = null; + closeHandler = null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a879cdf/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/PipelinedRequestsTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/PipelinedRequestsTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/PipelinedRequestsTest.java new file mode 100644 index 0000000..0c96eb6 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/PipelinedRequestsTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.http; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.http.HttpHost; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.concurrent.FutureCallback; +import org.apache.http.config.ConnectionConfig; +import org.apache.http.impl.nio.DefaultHttpClientIODispatch; +import org.apache.http.impl.nio.pool.BasicNIOConnPool; +import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; +import org.apache.http.message.BasicHttpRequest; +import org.apache.http.nio.protocol.BasicAsyncRequestProducer; +import org.apache.http.nio.protocol.BasicAsyncResponseConsumer; +import org.apache.http.nio.protocol.HttpAsyncRequestExecutor; +import org.apache.http.nio.protocol.HttpAsyncRequester; +import org.apache.http.nio.reactor.ConnectingIOReactor; +import org.apache.http.nio.reactor.IOEventDispatch; +import org.apache.http.protocol.HttpCoreContext; +import org.apache.http.protocol.HttpProcessor; +import org.apache.http.protocol.HttpProcessorBuilder; +import org.apache.http.protocol.RequestConnControl; +import org.apache.http.protocol.RequestContent; +import org.apache.http.protocol.RequestExpectContinue; +import org.apache.http.protocol.RequestTargetHost; +import org.apache.hyracks.http.server.HttpServer; +import org.apache.hyracks.http.server.HttpServerConfig; +import org.apache.hyracks.http.server.HttpServerConfigBuilder; +import org.apache.hyracks.http.server.InterruptOnCloseHandler; +import org.apache.hyracks.http.server.WebManager; +import org.apache.hyracks.http.servlet.SleepyServlet; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.Assert; +import org.junit.Test; + +public class PipelinedRequestsTest { + + private static final Logger LOGGER = LogManager.getLogger(); + private static final int PORT = 9898; + private static final String PATH = "/"; + + @Test + public void pipelinedRequests() throws Exception { + setupServer(); + final HttpHost target = new HttpHost("localhost", PORT); + final List<BasicAsyncRequestProducer> requestProducers = + Arrays.asList(new BasicAsyncRequestProducer(target, new BasicHttpRequest("GET", PATH)), + new BasicAsyncRequestProducer(target, new BasicHttpRequest("GET", PATH))); + final List<BasicAsyncResponseConsumer> responseConsumers = + Arrays.asList(new BasicAsyncResponseConsumer(), new BasicAsyncResponseConsumer()); + final List<HttpResponse> httpResponses = executePipelined(target, requestProducers, responseConsumers); + for (HttpResponse response : httpResponses) { + Assert.assertNotEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode()); + } + } + + private void setupServer() throws Exception { + final WebManager webMgr = new WebManager(); + final HttpServerConfig config = + HttpServerConfigBuilder.custom().setThreadCount(16).setRequestQueueSize(16).build(); + final HttpServer server = + new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, config, InterruptOnCloseHandler.INSTANCE); + final SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH }); + server.addServlet(servlet); + webMgr.add(server); + webMgr.start(); + } + + private List<HttpResponse> executePipelined(HttpHost host, List<BasicAsyncRequestProducer> requestProducers, + List<BasicAsyncResponseConsumer> responseConsumers) throws Exception { + final List<HttpResponse> results = new ArrayList<>(); + final HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor(); + final IOEventDispatch ioEventDispatch = + new DefaultHttpClientIODispatch(protocolHandler, ConnectionConfig.DEFAULT); + final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(); + final BasicNIOConnPool pool = new BasicNIOConnPool(ioReactor, ConnectionConfig.DEFAULT); + pool.setDefaultMaxPerRoute(1); + pool.setMaxTotal(1); + final Thread reactorThread = new Thread(() -> { + try { + ioReactor.execute(ioEventDispatch); + } catch (final IOException e) { + LOGGER.error(e); + } + }); + reactorThread.start(); + final HttpCoreContext context = HttpCoreContext.create(); + final CountDownLatch latch = new CountDownLatch(1); + final HttpProcessor httpProc = + HttpProcessorBuilder.create().add(new RequestContent()).add(new RequestTargetHost()) + .add(new RequestConnControl()).add(new RequestExpectContinue(true)).build(); + final HttpAsyncRequester requester = new HttpAsyncRequester(httpProc); + requester.executePipelined(host, requestProducers, responseConsumers, pool, context, + new FutureCallback<List<HttpResponse>>() { + @Override + public void completed(final List<HttpResponse> result) { + results.addAll(result); + latch.countDown(); + } + + @Override + public void failed(final Exception ex) { + latch.countDown(); + } + + @Override + public void cancelled() { + latch.countDown(); + } + }); + latch.await(5, TimeUnit.SECONDS); + ioReactor.shutdown(); + return results; + } +}
