This is an automated email from the ASF dual-hosted git repository. albumenj pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 2dab2d7fdb69a7a391b377eb02fb8d92424819fb Author: 一个不知名的Java靓仔 <[email protected]> AuthorDate: Mon Jul 4 19:49:29 2022 +0800 [3.0] Fixed the problem that the service was offline for a long time without re-registration (#10200) * 修复因网络抖动导致服务长时间下线没有重新注册的问题 * lazy init CuratorWatcher executor and destroyed on shutdown * add lock on close --- .../zookeeper/curator/CuratorZookeeperClient.java | 40 +++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java index 975e10fd19..d1bbb5e960 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java @@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.config.configcenter.ConfigItem; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.NamedThreadFactory; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.remoting.zookeeper.AbstractZookeeperClient; import org.apache.dubbo.remoting.zookeeper.ChildListener; @@ -49,6 +50,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.apache.dubbo.common.constants.CommonConstants.SESSION_KEY; @@ -84,6 +87,7 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZooke if (!connected) { throw new IllegalStateException("zookeeper not connected"); } + CuratorWatcherImpl.closed = false; } catch (Exception e) { close(); throw new IllegalStateException(e.getMessage(), e); @@ -256,6 +260,13 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZooke public void doClose() { super.close(); client.close(); + CuratorWatcherImpl.closed = true; + synchronized (CuratorWatcherImpl.class) { + if (CuratorWatcherImpl.CURATOR_WATCHER_EXECUTOR_SERVICE != null) { + CuratorWatcherImpl.CURATOR_WATCHER_EXECUTOR_SERVICE.shutdown(); + CuratorWatcherImpl.CURATOR_WATCHER_EXECUTOR_SERVICE = null; + } + } } @Override @@ -354,10 +365,24 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZooke static class CuratorWatcherImpl implements CuratorWatcher { + private static volatile ExecutorService CURATOR_WATCHER_EXECUTOR_SERVICE; + + private static volatile boolean closed = false; + private CuratorFramework client; private volatile ChildListener childListener; private String path; + private static void initExecutorIfNecessary() { + if (!closed && CURATOR_WATCHER_EXECUTOR_SERVICE == null) { + synchronized (CuratorWatcherImpl.class) { + if (!closed && CURATOR_WATCHER_EXECUTOR_SERVICE == null) { + CURATOR_WATCHER_EXECUTOR_SERVICE = Executors.newSingleThreadExecutor(new NamedThreadFactory("Dubbo-CuratorWatcher")); + } + } + } + } + public CuratorWatcherImpl(CuratorFramework client, ChildListener listener, String path) { this.client = client; this.childListener = listener; @@ -380,7 +405,20 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZooke } if (childListener != null) { - childListener.childChanged(path, client.getChildren().usingWatcher(this).forPath(path)); + Runnable task = new Runnable() { + @Override + public void run() { + try { + childListener.childChanged(path, client.getChildren().usingWatcher(CuratorWatcherImpl.this).forPath(path)); + } catch (Exception e) { + logger.warn("client get children error", e); + } + } + }; + initExecutorIfNecessary(); + if (!closed && CURATOR_WATCHER_EXECUTOR_SERVICE != null) { + CURATOR_WATCHER_EXECUTOR_SERVICE.execute(task); + } } } }
