This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0d6c94be0e [ISSUE #8705] Make MQClientAPIFactory shutdown async (#8706)
0d6c94be0e is described below

commit 0d6c94be0e9ebeaa16acbaf6dd29f24e9349aa74
Author: qianye <[email protected]>
AuthorDate: Thu Sep 19 20:13:12 2024 +0800

    [ISSUE #8705] Make MQClientAPIFactory shutdown async (#8706)
---
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  5 +-
 .../client/impl/mqclient/MQClientAPIFactory.java   |  5 +-
 .../rocketmq/common/utils/AsyncShutdownHelper.java | 76 ++++++++++++++++++++++
 3 files changed, 83 insertions(+), 3 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 8a3d3dd0dc..b539b8f098 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -78,6 +78,7 @@ import 
org.apache.rocketmq.common.namesrv.NameServerUpdateCallback;
 import org.apache.rocketmq.common.namesrv.TopAddressing;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
 import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.StartAndShutdown;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -184,9 +185,9 @@ import 
org.apache.rocketmq.remoting.protocol.header.GetTopicConfigRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetTopicsByClusterRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetUserRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.ListAclsRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.ListUsersRequestHeader;
-import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.LockBatchMqRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader;
@@ -247,7 +248,7 @@ import 
org.apache.rocketmq.remoting.rpchook.StreamTypeRPCHook;
 
 import static 
org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
 
-public class MQClientAPIImpl implements NameServerUpdateCallback {
+public class MQClientAPIImpl implements NameServerUpdateCallback, 
StartAndShutdown {
     private final static Logger log = 
LoggerFactory.getLogger(MQClientAPIImpl.class);
     private static boolean sendSmartMsg =
         
Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg",
 "true"));
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
index c68859b288..0fa31b6640 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.common.NameserverAccessConfig;
 import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.utils.AsyncShutdownHelper;
 import org.apache.rocketmq.common.utils.StartAndShutdown;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
@@ -85,9 +86,11 @@ public class MQClientAPIFactory implements StartAndShutdown {
 
     @Override
     public void shutdown() throws Exception {
+        AsyncShutdownHelper helper = new AsyncShutdownHelper();
         for (int i = 0; i < this.clientNum; i++) {
-            clients[i].shutdown();
+            helper.addTarget(clients[i]);
         }
+        helper.shutdown().await(Integer.MAX_VALUE, TimeUnit.SECONDS);
     }
 
     protected MQClientAPIExt createAndStart(String instanceName) {
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/utils/AsyncShutdownHelper.java
 
b/common/src/main/java/org/apache/rocketmq/common/utils/AsyncShutdownHelper.java
new file mode 100644
index 0000000000..da765d5e74
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/utils/AsyncShutdownHelper.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class AsyncShutdownHelper {
+    private final AtomicBoolean shutdown;
+    private final List<Shutdown> targetList;
+
+    private CountDownLatch countDownLatch;
+
+    public AsyncShutdownHelper() {
+        this.targetList = new ArrayList<>();
+        this.shutdown = new AtomicBoolean(false);
+    }
+
+    public void addTarget(Shutdown target) {
+        if (shutdown.get()) {
+            return;
+        }
+        targetList.add(target);
+    }
+
+    public AsyncShutdownHelper shutdown() {
+        if (shutdown.get()) {
+            return this;
+        }
+        if (targetList.isEmpty()) {
+            return this;
+        }
+        this.countDownLatch = new CountDownLatch(targetList.size());
+        for (Shutdown target : targetList) {
+            Runnable runnable = () -> {
+                try {
+                    target.shutdown();
+                } catch (Exception ignored) {
+
+                } finally {
+                    countDownLatch.countDown();
+                }
+            };
+            new Thread(runnable).start();
+        }
+        return this;
+    }
+
+    public boolean await(long time, TimeUnit unit) throws InterruptedException 
{
+        if (shutdown.get()) {
+            return false;
+        }
+        try {
+            return this.countDownLatch.await(time, unit);
+        } finally {
+            shutdown.compareAndSet(false, true);
+        }
+    }
+}

Reply via email to