This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0d6c94be0e [ISSUE #8705] Make MQClientAPIFactory shutdown async (#8706)
0d6c94be0e is described below
commit 0d6c94be0e9ebeaa16acbaf6dd29f24e9349aa74
Author: qianye <[email protected]>
AuthorDate: Thu Sep 19 20:13:12 2024 +0800
[ISSUE #8705] Make MQClientAPIFactory shutdown async (#8706)
---
.../rocketmq/client/impl/MQClientAPIImpl.java | 5 +-
.../client/impl/mqclient/MQClientAPIFactory.java | 5 +-
.../rocketmq/common/utils/AsyncShutdownHelper.java | 76 ++++++++++++++++++++++
3 files changed, 83 insertions(+), 3 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 8a3d3dd0dc..b539b8f098 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -78,6 +78,7 @@ import
org.apache.rocketmq.common.namesrv.NameServerUpdateCallback;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -184,9 +185,9 @@ import
org.apache.rocketmq.remoting.protocol.header.GetTopicConfigRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetTopicsByClusterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetUserRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ListAclsRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ListUsersRequestHeader;
-import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.LockBatchMqRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader;
@@ -247,7 +248,7 @@ import
org.apache.rocketmq.remoting.rpchook.StreamTypeRPCHook;
import static
org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
-public class MQClientAPIImpl implements NameServerUpdateCallback {
+public class MQClientAPIImpl implements NameServerUpdateCallback,
StartAndShutdown {
private final static Logger log =
LoggerFactory.getLogger(MQClientAPIImpl.class);
private static boolean sendSmartMsg =
Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg",
"true"));
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
index c68859b288..0fa31b6640 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.common.NameserverAccessConfig;
import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.utils.AsyncShutdownHelper;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
@@ -85,9 +86,11 @@ public class MQClientAPIFactory implements StartAndShutdown {
@Override
public void shutdown() throws Exception {
+ AsyncShutdownHelper helper = new AsyncShutdownHelper();
for (int i = 0; i < this.clientNum; i++) {
- clients[i].shutdown();
+ helper.addTarget(clients[i]);
}
+ helper.shutdown().await(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
protected MQClientAPIExt createAndStart(String instanceName) {
diff --git
a/common/src/main/java/org/apache/rocketmq/common/utils/AsyncShutdownHelper.java
b/common/src/main/java/org/apache/rocketmq/common/utils/AsyncShutdownHelper.java
new file mode 100644
index 0000000000..da765d5e74
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/utils/AsyncShutdownHelper.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class AsyncShutdownHelper {
+ private final AtomicBoolean shutdown;
+ private final List<Shutdown> targetList;
+
+ private CountDownLatch countDownLatch;
+
+ public AsyncShutdownHelper() {
+ this.targetList = new ArrayList<>();
+ this.shutdown = new AtomicBoolean(false);
+ }
+
+ public void addTarget(Shutdown target) {
+ if (shutdown.get()) {
+ return;
+ }
+ targetList.add(target);
+ }
+
+ public AsyncShutdownHelper shutdown() {
+ if (shutdown.get()) {
+ return this;
+ }
+ if (targetList.isEmpty()) {
+ return this;
+ }
+ this.countDownLatch = new CountDownLatch(targetList.size());
+ for (Shutdown target : targetList) {
+ Runnable runnable = () -> {
+ try {
+ target.shutdown();
+ } catch (Exception ignored) {
+
+ } finally {
+ countDownLatch.countDown();
+ }
+ };
+ new Thread(runnable).start();
+ }
+ return this;
+ }
+
+ public boolean await(long time, TimeUnit unit) throws InterruptedException
{
+ if (shutdown.get()) {
+ return false;
+ }
+ try {
+ return this.countDownLatch.await(time, unit);
+ } finally {
+ shutdown.compareAndSet(false, true);
+ }
+ }
+}