Repository: qpid-broker-j Updated Branches: refs/heads/master a7e105751 -> 2c7ec1509
QPID-8089: [Broker-J][HTTP Management] Schedule the connector shutdown once the endpoints have closed - avoids sporadic test fail Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/2c7ec150 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/2c7ec150 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/2c7ec150 Branch: refs/heads/master Commit: 2c7ec1509a3d3080d14a038dd6a983bf0928136d Parents: a7e1057 Author: Keith Wall <[email protected]> Authored: Mon Mar 26 14:34:51 2018 +0100 Committer: Keith Wall <[email protected]> Committed: Mon Mar 26 14:34:51 2018 +0100 ---------------------------------------------------------------------- .../management/plugin/HttpManagement.java | 103 ++++++++++++++----- 1 file changed, 79 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2c7ec150/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java ---------------------------------------------------------------------- diff --git a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java index b263c69..f1b781d 100644 --- a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java +++ b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -37,7 +38,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; @@ -51,6 +51,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.ssl.SslHandshakeListener; import org.eclipse.jetty.server.ConnectionFactory; import org.eclipse.jetty.server.HttpConfiguration; @@ -74,6 +76,7 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.logging.messages.ManagementConsoleMessages; import org.apache.qpid.server.logging.messages.PortMessages; import org.apache.qpid.server.management.plugin.filter.AuthenticationCheckFilter; @@ -299,6 +302,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem for (HttpPort<?> port : ports) { ServerConnector connector = createConnector(port, server); + connector.addBean(new ConnectionTrackingListener()); server.addConnector(connector); _portConnectorMap.put(port, connector); lastPort = port.getPort(); @@ -898,6 +902,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem try { connector = createConnector(port, server); + connector.addBean(new ConnectionTrackingListener()); server.addConnector(connector); connector.start(); _portConnectorMap.put(port, connector); @@ -924,37 +929,87 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem Server server = _server; if (server != null) { - ServerConnector connector = _portConnectorMap.remove(port); + final ServerConnector connector = _portConnectorMap.remove(port); if (connector != null) { - int localPort = connector.getLocalPort(); - try + final int localPort = connector.getLocalPort(); + + final ConnectionTrackingListener tracker = connector.getBean(ConnectionTrackingListener.class); + // closes the server socket - we will see no new connections arriving + connector.close(); + // minimise the timeout of endpoints so they close in a timely fashion + connector.setIdleTimeout(1); + connector.getConnectedEndPoints().forEach(endPoint -> endPoint.setIdleTimeout(1)); + LOGGER.debug("Connector has {} connection(s)", tracker.getConnectionCount()); + + final TaskExecutor taskExecutor = getBroker().getTaskExecutor(); + tracker.getAllClosedFuture().addListener(new Runnable() { - connector.close(); - } - catch (Exception e) - { - LOGGER.warn("Failed to close connector for http port {}", port, e); - } - getBroker().scheduleTask(0, TimeUnit.SECONDS, () -> { - LOGGER.debug("Stopping connector for http port {}", localPort); - try - { - connector.stop(); - } - catch (Exception e) - { - LOGGER.warn("Failed to stop connector for http port {}", localPort, e); - } - finally + @Override + public void run() { - logOperationalShutdownMessage(localPort); - _server.removeConnector(connector); + final int connectionCount = tracker.getConnectionCount(); + if (connectionCount == 0) + { + LOGGER.debug("Stopping connector for http port {}", localPort); + try + { + connector.stop(); + } + catch (Exception e) + { + LOGGER.warn("Failed to stop connector for http port {}", localPort, e); + } + finally + { + logOperationalShutdownMessage(localPort); + _server.removeConnector(connector); + } + } + else + { + LOGGER.debug("Connector still has {} connection(s)", tracker.getConnectionCount()); + connector.getConnectedEndPoints().forEach(endPoint -> endPoint.setIdleTimeout(1)); + tracker.getAllClosedFuture() + .addListener(this, taskExecutor); + } } - }); + }, taskExecutor); } } } } + + } + + private static class ConnectionTrackingListener implements Connection.Listener + { + private final Map<Connection, SettableFuture<Void>> _closeFutures = new HashMap<>(); + + @Override + public void onOpened(final Connection connection) + { + _closeFutures.put(connection, SettableFuture.create()); + } + + @Override + public void onClosed(final Connection connection) + { + SettableFuture<Void> closeFuture = _closeFutures.remove(connection); + if (closeFuture != null) + { + closeFuture.set(null); + } + } + + public ListenableFuture<List<Void>> getAllClosedFuture() + { + return Futures.allAsList(_closeFutures.values()); + } + + public int getConnectionCount() + { + return _closeFutures.size(); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
