This is an automated email from the ASF dual-hosted git repository. dlych pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 572171e51c6c52b26d4e7b58a1d0940a232d09d6 Author: Murtadha Hubail <[email protected]> AuthorDate: Fri Jan 15 16:01:35 2021 +0300 [NO ISSUE][OTH] Support multiple addresses in http servers - user model changes: no - storage format changes: no - interface changes: no Details: - Allow binding http servers to multiple addresses. - Add test cases. Change-Id: I68f25dc5af471c7ded29f27405c311947a007947 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/9624 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- .../org/apache/hyracks/http/server/HttpServer.java | 105 ++++++++++++++------- .../apache/hyracks/test/http/HttpServerTest.java | 49 +++++++++- 2 files changed, 116 insertions(+), 38 deletions(-) diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java index 93762a6..97b8859 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java @@ -20,6 +20,9 @@ package org.apache.hyracks.http.server; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -39,6 +42,7 @@ import org.apache.logging.log4j.Logger; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; @@ -50,6 +54,8 @@ import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpScheme; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; public class HttpServer { // Constants @@ -64,6 +70,7 @@ public class HttpServer { private static final int STARTING = 1; private static final int STARTED = 2; private static final int STOPPING = 3; + private static final int RECOVERING = 4; // Final members private final IChannelClosedHandler closedHandler; private final Object lock = new Object(); @@ -73,38 +80,59 @@ public class HttpServer { private final ServletRegistry servlets; private final EventLoopGroup bossGroup; private final EventLoopGroup workerGroup; - private final InetSocketAddress address; + private final InetSocketAddress defaultAddress; + private final List<InetSocketAddress> addresses; private final ThreadPoolExecutor executor; // Mutable members private volatile int state = STOPPED; private volatile Thread recoveryThread; - private volatile Channel channel; + private final List<Channel> channels; private Throwable cause; private HttpServerConfig config; + private final GenericFutureListener<Future<Void>> channelCloseListener = f -> { + // This listener is invoked from within a netty IO thread. Hence, we can never block it + // For simplicity, we will submit the recovery task to a different thread. We will also + // close all channels on this server and attempt to rebind them. + synchronized (lock) { + if (state != STARTED) { + return; + } + LOGGER.log(Level.WARN, "{} has stopped unexpectedly. Starting server recovery", this); + MXHelper.logFileDescriptors(); + state = RECOVERING; + triggerRecovery(); + } + }; + public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port, HttpServerConfig config) { - this(bossGroup, workerGroup, new InetSocketAddress(port), config, null); + this(bossGroup, workerGroup, Collections.singletonList(new InetSocketAddress(port)), config, null); } public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, InetSocketAddress address, - HttpServerConfig config) { - this(bossGroup, workerGroup, address, config, null); + HttpServerConfig config, IChannelClosedHandler closeHandler) { + this(bossGroup, workerGroup, Collections.singletonList(address), config, closeHandler); } - public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, InetSocketAddress address, + public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, List<InetSocketAddress> addresses, HttpServerConfig config, IChannelClosedHandler closeHandler) { + if (addresses.isEmpty()) { + throw new IllegalArgumentException("no addresses specified"); + } this.bossGroup = bossGroup; this.workerGroup = workerGroup; - this.address = address; + this.addresses = addresses; + defaultAddress = addresses.get(0); this.closedHandler = closeHandler; this.config = config; + channels = new ArrayList<>(); ctx = new ConcurrentHashMap<>(); servlets = new ServletRegistry(); workQueue = new LinkedBlockingQueue<>(config.getRequestQueueSize()); int numExecutorThreads = config.getThreadCount(); executor = new ThreadPoolExecutor(numExecutorThreads, numExecutorThreads, 0L, TimeUnit.MILLISECONDS, workQueue, runnable -> new Thread(runnable, - "HttpExecutor(port:" + address.getPort() + ")-" + threadId.getAndIncrement())); + "HttpExecutor(port:" + defaultAddress.getPort() + ")-" + threadId.getAndIncrement())); long directMemoryBudget = numExecutorThreads * (long) HIGH_WRITE_BUFFER_WATER_MARK + numExecutorThreads * config.getMaxResponseChunkSize(); LOGGER.log(Level.DEBUG, @@ -128,7 +156,7 @@ public class HttpServer { doStart(); setStarted(); } catch (Throwable e) { // NOSONAR - LOGGER.error("Failure starting an Http Server at: {}", address, e); + LOGGER.error("Failure starting an Http Server at: {}", defaultAddress, e); setFailed(e); throw e; } @@ -175,6 +203,8 @@ public class HttpServer { return "STOPPING"; case STOPPED: return "STOPPED"; + case RECOVERING: + return "RECOVERING"; default: return "UNKNOWN"; } @@ -229,10 +259,10 @@ public class HttpServer { for (IServlet servlet : servlets.getServlets()) { servlet.init(); } - channel = bind(); + bind(); } - private Channel bind() throws InterruptedException { + private void bind() throws InterruptedException { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE)) @@ -240,23 +270,20 @@ public class HttpServer { .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK) .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(getChannelInitializer()); - Channel newChannel = b.bind(address).sync().channel(); - newChannel.closeFuture().addListener(f -> { - // This listener is invoked from within a netty IO thread. Hence, we can never block it - // For simplicity, we will submit the recovery task to a different thread + List<ChannelFuture> channelFutures = new ArrayList<>(); + for (InetSocketAddress address : addresses) { + channelFutures.add(b.bind(address)); + } + for (ChannelFuture future : channelFutures) { + Channel channel = future.sync().channel(); + channel.closeFuture().addListener(channelCloseListener); synchronized (lock) { - if (state != STARTED) { - return; - } - LOGGER.log(Level.WARN, "{} has stopped unexpectedly. Starting server recovery", this); - MXHelper.logFileDescriptors(); - triggerRecovery(); + channels.add(channel); } - }); - return newChannel; + } } - private void triggerRecovery() { + private void triggerRecovery() throws InterruptedException { Thread rt = recoveryThread; if (rt != null) { try { @@ -267,7 +294,7 @@ public class HttpServer { return; } } - // try to revive the channel + // try to revive the channels recoveryThread = new Thread(this::recover); recoveryThread.start(); } @@ -275,9 +302,11 @@ public class HttpServer { public void recover() { try { synchronized (lock) { - while (state == STARTED) { + while (state == RECOVERING) { try { - channel = bind(); + closeChannels(); + bind(); + setStarted(); break; } catch (InterruptedException e) { LOGGER.log(Level.WARN, this + " was interrupted while attempting to revive server channel", e); @@ -329,10 +358,7 @@ public class HttpServer { } catch (Exception e) { LOGGER.log(Level.ERROR, "Error while shutting down http server executor", e); } - if (channel != null) { - channel.close(); - channel.closeFuture().sync(); - } + closeChannels(); } public IServlet getServlet(FullHttpRequest request) { @@ -369,8 +395,8 @@ public class HttpServer { @Override public String toString() { - return "{\"class\":\"" + getClass().getSimpleName() + "\",\"address\":" + address + ",\"state\":\"" + getState() - + "\"}"; + return "{\"class\":\"" + getClass().getSimpleName() + "\",\"address\":" + defaultAddress + ",\"state\":\"" + + getState() + "\"}"; } public HttpServerConfig getConfig() { @@ -378,6 +404,17 @@ public class HttpServer { } public InetSocketAddress getAddress() { - return address; + return defaultAddress; + } + + private void closeChannels() throws InterruptedException { + synchronized (lock) { + for (Channel channel : channels) { + channel.closeFuture().removeListener(channelCloseListener); + channel.close(); + channel.closeFuture().sync(); + } + channels.clear(); + } } } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java index 84c8c65..76438aa 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java @@ -28,6 +28,7 @@ import java.net.Socket; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -263,7 +264,10 @@ public class HttpServerTest { WebManager webMgr = new WebManager(); final HttpServerConfig config = HttpServerConfigBuilder.custom().setThreadCount(numExecutors) .setRequestQueueSize(serverQueueSize).build(); - HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, config); + List<InetSocketAddress> addresses = new ArrayList<>(); + addresses.add(new InetSocketAddress(PORT)); + addresses.add(new InetSocketAddress(PORT + 1)); + HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), addresses, config, null); ChattyServlet servlet = new ChattyServlet(server.ctx(), new String[] { PATH }); server.addServlet(servlet); webMgr.add(server); @@ -276,12 +280,12 @@ public class HttpServerTest { } Assert.assertEquals(numRequests, SUCCESS_COUNT.get()); // close the channel - Field channelField = server.getClass().getDeclaredField("channel"); + Field channelField = server.getClass().getDeclaredField("channels"); channelField.setAccessible(true); Field recoveryThreadField = server.getClass().getDeclaredField("recoveryThread"); recoveryThreadField.setAccessible(true); - Channel channel = (Channel) channelField.get(server); - channel.close(); + List<Channel> channels = (ArrayList<Channel>) channelField.get(server); + channels.get(0).close(); Thread.sleep(1000); final int sleeps = 10; for (int i = 0; i < sleeps; i++) { @@ -409,6 +413,43 @@ public class HttpServerTest { } } + @Test + public void multiAddressServerTest() throws Exception { + final WebManager webMgr = new WebManager(); + final HttpServerConfig config = + HttpServerConfigBuilder.custom().setThreadCount(16).setRequestQueueSize(16).build(); + List<Integer> ports = Arrays.asList(PORT, PORT + 1); + List<InetSocketAddress> addresses = new ArrayList<>(); + for (Integer port : ports) { + addresses.add(new InetSocketAddress(port)); + } + HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), addresses, config, null); + EchoServlet servlet = new EchoServlet(server.ctx(), PATH); + server.addServlet(servlet); + webMgr.add(server); + webMgr.start(); + try { + for (Integer port : ports) { + try (CloseableHttpClient httpClient = HttpClients.createDefault()) { + final URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST, port, + HttpServerTest.PATH, null, null); + final HttpPost postRequest = new HttpPost(uri); + final String requestBody = "test"; + final StringEntity chunkedEntity = new StringEntity(requestBody); + chunkedEntity.setChunked(true); + postRequest.setEntity(chunkedEntity); + try (CloseableHttpResponse response = httpClient.execute(postRequest)) { + final String responseBody = EntityUtils.toString(response.getEntity()); + Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpResponseStatus.OK.code()); + Assert.assertEquals(responseBody, requestBody); + } + } + } + } finally { + webMgr.stop(); + } + } + private void request(int count) throws URISyntaxException { request(count, 0); }
