This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f21a96feaa12448ec27e3ce460d5da960bc2fca7
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Apr 5 16:21:33 2023 +0300

    [improve][proxy] Implement graceful shutdown for Pulsar Proxy (#20011)
    
    (cherry picked from commit 0c9a866f948ed6636050fac110563ad4e64bb3b1)
---
 .../apache/pulsar/proxy/server/ProxyService.java   | 76 ++++++++++++++++++----
 1 file changed, 63 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 2fb3fd67446..6f456686030 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pulsar.proxy.server;
 
 import static java.util.Objects.requireNonNull;
@@ -184,7 +185,7 @@ public class ProxyService implements Closeable {
 
         statsExecutor = Executors
                 .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("proxy-stats-executor"));
-        statsExecutor.schedule(()->{
+        statsExecutor.schedule(() -> {
             this.clientCnxs.forEach(cnx -> {
                 if (cnx.getDirectProxyHandler() != null
                         && 
cnx.getDirectProxyHandler().getInboundChannelRequestsRate() != null) {
@@ -216,7 +217,7 @@ public class ProxyService implements Closeable {
             pulsarResources = new PulsarResources(localMetadataStore, 
configMetadataStore);
             discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, 
pulsarResources);
             authorizationService = new 
AuthorizationService(PulsarConfigurationLoader.convertFrom(proxyConfig),
-                                                            pulsarResources);
+                    pulsarResources);
         }
 
         ServerBootstrap bootstrap = new ServerBootstrap();
@@ -258,7 +259,7 @@ public class ProxyService implements Closeable {
         }
 
         final String hostname =
-            
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(proxyConfig.getAdvertisedAddress());
+                
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(proxyConfig.getAdvertisedAddress());
 
         if (proxyConfig.getServicePort().isPresent()) {
             this.serviceUrl = String.format("pulsar://%s:%d/", hostname, 
getListenPort().get());
@@ -345,18 +346,38 @@ public class ProxyService implements Closeable {
     }
 
     public void close() throws IOException {
-        dnsAddressResolverGroup.close();
+        if (listenChannel != null) {
+            try {
+                listenChannel.close().sync();
+            } catch (InterruptedException e) {
+                LOG.info("Shutdown of listenChannel interrupted");
+                Thread.currentThread().interrupt();
+            }
+        }
 
-        if (discoveryProvider != null) {
-            discoveryProvider.close();
+        if (listenChannelTls != null) {
+            try {
+                listenChannelTls.close().sync();
+            } catch (InterruptedException e) {
+                LOG.info("Shutdown of listenChannelTls interrupted");
+                Thread.currentThread().interrupt();
+            }
         }
 
-        if (listenChannel != null) {
-            listenChannel.close();
+        // Don't accept any new connections
+        try {
+            acceptorGroup.shutdownGracefully().sync();
+        } catch (InterruptedException e) {
+            LOG.info("Shutdown of acceptorGroup interrupted");
+            Thread.currentThread().interrupt();
         }
 
-        if (listenChannelTls != null) {
-            listenChannelTls.close();
+        closeAllConnections();
+
+        dnsAddressResolverGroup.close();
+
+        if (discoveryProvider != null) {
+            discoveryProvider.close();
         }
 
         if (statsExecutor != null) {
@@ -384,10 +405,39 @@ public class ProxyService implements Closeable {
                 throw new IOException(e);
             }
         }
-        acceptorGroup.shutdownGracefully();
-        workerGroup.shutdownGracefully();
+        try {
+            workerGroup.shutdownGracefully().sync();
+        } catch (InterruptedException e) {
+            LOG.info("Shutdown of workerGroup interrupted");
+            Thread.currentThread().interrupt();
+        }
         for (EventLoopGroup group : extensionsWorkerGroups) {
-            group.shutdownGracefully();
+            try {
+                group.shutdownGracefully().sync();
+            } catch (InterruptedException e) {
+                LOG.info("Shutdown of {} interrupted", group);
+                Thread.currentThread().interrupt();
+            }
+        }
+        LOG.info("ProxyService closed.");
+    }
+
+    private void closeAllConnections() {
+        try {
+            workerGroup.submit(() -> {
+                // Close all the connections
+                if (!clientCnxs.isEmpty()) {
+                    LOG.info("Closing {} proxy connections, including 
connections to brokers", clientCnxs.size());
+                    for (ProxyConnection clientCnx : clientCnxs) {
+                        clientCnx.ctx().close();
+                    }
+                } else {
+                    LOG.info("No proxy connections to close");
+                }
+            }).sync();
+        } catch (InterruptedException e) {
+            LOG.info("Closing of connections interrupted");
+            Thread.currentThread().interrupt();
         }
     }
 

Reply via email to