This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0c9a866f948 [improve][proxy] Implement graceful shutdown for Pulsar
Proxy (#20011)
0c9a866f948 is described below
commit 0c9a866f948ed6636050fac110563ad4e64bb3b1
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Apr 5 16:21:33 2023 +0300
[improve][proxy] Implement graceful shutdown for Pulsar Proxy (#20011)
---
.../apache/pulsar/proxy/server/ProxyService.java | 76 ++++++++++++++++++----
1 file changed, 63 insertions(+), 13 deletions(-)
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index dfe888d0651..4cca24f5f48 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.pulsar.proxy.server;
import static java.util.Objects.requireNonNull;
@@ -188,7 +189,7 @@ public class ProxyService implements Closeable {
statsExecutor = Executors
.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("proxy-stats-executor"));
- statsExecutor.schedule(()->{
+ statsExecutor.schedule(() -> {
this.clientCnxs.forEach(cnx -> {
if (cnx.getDirectProxyHandler() != null
&&
cnx.getDirectProxyHandler().getInboundChannelRequestsRate() != null) {
@@ -223,7 +224,7 @@ public class ProxyService implements Closeable {
pulsarResources = new PulsarResources(localMetadataStore,
configMetadataStore);
discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig,
pulsarResources);
authorizationService = new
AuthorizationService(PulsarConfigurationLoader.convertFrom(proxyConfig),
- pulsarResources);
+ pulsarResources);
}
ServerBootstrap bootstrap = new ServerBootstrap();
@@ -265,7 +266,7 @@ public class ProxyService implements Closeable {
}
final String hostname =
-
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(proxyConfig.getAdvertisedAddress());
+
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(proxyConfig.getAdvertisedAddress());
if (proxyConfig.getServicePort().isPresent()) {
this.serviceUrl = String.format("pulsar://%s:%d/", hostname,
getListenPort().get());
@@ -352,18 +353,38 @@ public class ProxyService implements Closeable {
}
public void close() throws IOException {
- dnsAddressResolverGroup.close();
+ if (listenChannel != null) {
+ try {
+ listenChannel.close().sync();
+ } catch (InterruptedException e) {
+ LOG.info("Shutdown of listenChannel interrupted");
+ Thread.currentThread().interrupt();
+ }
+ }
- if (discoveryProvider != null) {
- discoveryProvider.close();
+ if (listenChannelTls != null) {
+ try {
+ listenChannelTls.close().sync();
+ } catch (InterruptedException e) {
+ LOG.info("Shutdown of listenChannelTls interrupted");
+ Thread.currentThread().interrupt();
+ }
}
- if (listenChannel != null) {
- listenChannel.close();
+ // Don't accept any new connections
+ try {
+ acceptorGroup.shutdownGracefully().sync();
+ } catch (InterruptedException e) {
+ LOG.info("Shutdown of acceptorGroup interrupted");
+ Thread.currentThread().interrupt();
}
- if (listenChannelTls != null) {
- listenChannelTls.close();
+ closeAllConnections();
+
+ dnsAddressResolverGroup.close();
+
+ if (discoveryProvider != null) {
+ discoveryProvider.close();
}
if (statsExecutor != null) {
@@ -391,10 +412,39 @@ public class ProxyService implements Closeable {
throw new IOException(e);
}
}
- acceptorGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
+ try {
+ workerGroup.shutdownGracefully().sync();
+ } catch (InterruptedException e) {
+ LOG.info("Shutdown of workerGroup interrupted");
+ Thread.currentThread().interrupt();
+ }
for (EventLoopGroup group : extensionsWorkerGroups) {
- group.shutdownGracefully();
+ try {
+ group.shutdownGracefully().sync();
+ } catch (InterruptedException e) {
+ LOG.info("Shutdown of {} interrupted", group);
+ Thread.currentThread().interrupt();
+ }
+ }
+ LOG.info("ProxyService closed.");
+ }
+
+ private void closeAllConnections() {
+ try {
+ workerGroup.submit(() -> {
+ // Close all the connections
+ if (!clientCnxs.isEmpty()) {
+ LOG.info("Closing {} proxy connections, including
connections to brokers", clientCnxs.size());
+ for (ProxyConnection clientCnx : clientCnxs) {
+ clientCnx.ctx().close();
+ }
+ } else {
+ LOG.info("No proxy connections to close");
+ }
+ }).sync();
+ } catch (InterruptedException e) {
+ LOG.info("Closing of connections interrupted");
+ Thread.currentThread().interrupt();
}
}