This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f21a96feaa12448ec27e3ce460d5da960bc2fca7 Author: Lari Hotari <[email protected]> AuthorDate: Wed Apr 5 16:21:33 2023 +0300 [improve][proxy] Implement graceful shutdown for Pulsar Proxy (#20011) (cherry picked from commit 0c9a866f948ed6636050fac110563ad4e64bb3b1) --- .../apache/pulsar/proxy/server/ProxyService.java | 76 ++++++++++++++++++---- 1 file changed, 63 insertions(+), 13 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 2fb3fd67446..6f456686030 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.pulsar.proxy.server; import static java.util.Objects.requireNonNull; @@ -184,7 +185,7 @@ public class ProxyService implements Closeable { statsExecutor = Executors .newSingleThreadScheduledExecutor(new DefaultThreadFactory("proxy-stats-executor")); - statsExecutor.schedule(()->{ + statsExecutor.schedule(() -> { this.clientCnxs.forEach(cnx -> { if (cnx.getDirectProxyHandler() != null && cnx.getDirectProxyHandler().getInboundChannelRequestsRate() != null) { @@ -216,7 +217,7 @@ public class ProxyService implements Closeable { pulsarResources = new PulsarResources(localMetadataStore, configMetadataStore); discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, pulsarResources); authorizationService = new AuthorizationService(PulsarConfigurationLoader.convertFrom(proxyConfig), - pulsarResources); + pulsarResources); } ServerBootstrap bootstrap = new ServerBootstrap(); @@ -258,7 +259,7 @@ public class ProxyService implements Closeable { } final String hostname = - ServiceConfigurationUtils.getDefaultOrConfiguredAddress(proxyConfig.getAdvertisedAddress()); + ServiceConfigurationUtils.getDefaultOrConfiguredAddress(proxyConfig.getAdvertisedAddress()); if (proxyConfig.getServicePort().isPresent()) { this.serviceUrl = String.format("pulsar://%s:%d/", hostname, getListenPort().get()); @@ -345,18 +346,38 @@ public class ProxyService implements Closeable { } public void close() throws IOException { - dnsAddressResolverGroup.close(); + if (listenChannel != null) { + try { + listenChannel.close().sync(); + } catch (InterruptedException e) { + LOG.info("Shutdown of listenChannel interrupted"); + Thread.currentThread().interrupt(); + } + } - if (discoveryProvider != null) { - discoveryProvider.close(); + if (listenChannelTls != null) { + try { + listenChannelTls.close().sync(); + } catch (InterruptedException e) { + LOG.info("Shutdown of listenChannelTls interrupted"); + Thread.currentThread().interrupt(); + } } - if (listenChannel != null) { - listenChannel.close(); + // Don't accept any new connections + try { + acceptorGroup.shutdownGracefully().sync(); + } catch (InterruptedException e) { + LOG.info("Shutdown of acceptorGroup interrupted"); + Thread.currentThread().interrupt(); } - if (listenChannelTls != null) { - listenChannelTls.close(); + closeAllConnections(); + + dnsAddressResolverGroup.close(); + + if (discoveryProvider != null) { + discoveryProvider.close(); } if (statsExecutor != null) { @@ -384,10 +405,39 @@ public class ProxyService implements Closeable { throw new IOException(e); } } - acceptorGroup.shutdownGracefully(); - workerGroup.shutdownGracefully(); + try { + workerGroup.shutdownGracefully().sync(); + } catch (InterruptedException e) { + LOG.info("Shutdown of workerGroup interrupted"); + Thread.currentThread().interrupt(); + } for (EventLoopGroup group : extensionsWorkerGroups) { - group.shutdownGracefully(); + try { + group.shutdownGracefully().sync(); + } catch (InterruptedException e) { + LOG.info("Shutdown of {} interrupted", group); + Thread.currentThread().interrupt(); + } + } + LOG.info("ProxyService closed."); + } + + private void closeAllConnections() { + try { + workerGroup.submit(() -> { + // Close all the connections + if (!clientCnxs.isEmpty()) { + LOG.info("Closing {} proxy connections, including connections to brokers", clientCnxs.size()); + for (ProxyConnection clientCnx : clientCnxs) { + clientCnx.ctx().close(); + } + } else { + LOG.info("No proxy connections to close"); + } + }).sync(); + } catch (InterruptedException e) { + LOG.info("Closing of connections interrupted"); + Thread.currentThread().interrupt(); } }
