This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new b3f533d726 Optimize the logic of zkDataListener (#10445)
b3f533d726 is described below
commit b3f533d726953d0f3a97c207d1555bd4047eb165
Author: 灼华 <[email protected]>
AuthorDate: Fri Aug 12 15:35:18 2022 +0800
Optimize the logic of zkDataListener (#10445)
---
.../configcenter/TreePathDynamicConfiguration.java | 4 +-
.../file/FileSystemDynamicConfiguration.java | 2 +-
.../support/zookeeper/CacheListener.java | 94 ++++++----------------
.../support/zookeeper/ZookeeperDataListener.java | 76 +++++++++++++++++
.../zookeeper/ZookeeperDynamicConfiguration.java | 32 +++++---
.../ZookeeperDynamicConfigurationFactory.java | 9 ---
6 files changed, 124 insertions(+), 93 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/TreePathDynamicConfiguration.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/TreePathDynamicConfiguration.java
index d311a908f5..cee63ea7db 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/TreePathDynamicConfiguration.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/TreePathDynamicConfiguration.java
@@ -94,7 +94,7 @@ public abstract class TreePathDynamicConfiguration extends
AbstractDynamicConfig
@Override
public final void addListener(String key, String group,
ConfigurationListener listener) {
String pathKey = buildPathKey(group, key);
- doAddListener(pathKey, listener);
+ doAddListener(pathKey, listener, key, group);
}
@Override
@@ -111,7 +111,7 @@ public abstract class TreePathDynamicConfiguration extends
AbstractDynamicConfig
protected abstract Collection<String> doGetConfigKeys(String groupPath);
- protected abstract void doAddListener(String pathKey,
ConfigurationListener listener);
+ protected abstract void doAddListener(String pathKey,
ConfigurationListener listener, String key, String group);
protected abstract void doRemoveListener(String pathKey,
ConfigurationListener listener);
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfiguration.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfiguration.java
index 815e714025..cd43c54a31 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfiguration.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfiguration.java
@@ -402,7 +402,7 @@ public class FileSystemDynamicConfiguration extends
TreePathDynamicConfiguration
}
@Override
- protected void doAddListener(String pathKey, ConfigurationListener
listener) {
+ protected void doAddListener(String pathKey, ConfigurationListener
listener, String key, String group) {
doInListener(pathKey, (configFilePath, listeners) -> {
if (listeners.isEmpty()) { // If no element, it indicates
watchService was registered before
ThrowableConsumer.execute(configFilePath, configFile -> {
diff --git
a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java
b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java
index fe34bd5577..7c2bdf21fb 100644
---
a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java
+++
b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java
@@ -16,94 +16,50 @@
*/
package org.apache.dubbo.configcenter.support.zookeeper;
-import org.apache.dubbo.common.config.configcenter.ConfigChangeType;
-import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent;
import org.apache.dubbo.common.config.configcenter.ConfigurationListener;
import org.apache.dubbo.common.utils.CollectionUtils;
-import org.apache.dubbo.common.utils.StringUtils;
-import org.apache.dubbo.remoting.zookeeper.DataListener;
-import org.apache.dubbo.remoting.zookeeper.EventType;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import static org.apache.dubbo.common.constants.CommonConstants.DOT_SEPARATOR;
-import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
-
-public class CacheListener implements DataListener {
+/**
+ * one path has one zookeeperDataListener
+ */
+public class CacheListener {
- private Map<String, Set<ConfigurationListener>> keyListeners = new
ConcurrentHashMap<>();
- private String rootPath;
+ private Map<String, ZookeeperDataListener> pathKeyListeners = new
ConcurrentHashMap<>();
- public CacheListener(String rootPath) {
- this.rootPath = rootPath;
+ public CacheListener() {
}
- public void addListener(String key, ConfigurationListener
configurationListener) {
- Set<ConfigurationListener> listeners =
keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>());
- listeners.add(configurationListener);
+ public ZookeeperDataListener addListener(String pathKey,
ConfigurationListener configurationListener, String key, String group) {
+ ZookeeperDataListener zookeeperDataListener =
pathKeyListeners.computeIfAbsent(pathKey,
+ _pathKey -> new ZookeeperDataListener(_pathKey, key, group));
+ zookeeperDataListener.addListener(configurationListener);
+ return zookeeperDataListener;
}
- public void removeListener(String key, ConfigurationListener
configurationListener) {
- Set<ConfigurationListener> listeners = keyListeners.get(key);
- if (listeners != null) {
- listeners.remove(configurationListener);
+ public ZookeeperDataListener removeListener(String pathKey,
ConfigurationListener configurationListener) {
+ ZookeeperDataListener zookeeperDataListener =
pathKeyListeners.get(pathKey);
+ if (zookeeperDataListener != null) {
+ zookeeperDataListener.removeListener(configurationListener);
+ if (CollectionUtils.isEmpty(zookeeperDataListener.getListeners()))
{
+ pathKeyListeners.remove(pathKey);
+ }
}
+ return zookeeperDataListener;
}
- public Set<ConfigurationListener> getConfigurationListeners(String key) {
- return keyListeners.get(key);
- }
-
- /**
- * This is used to convert a configuration nodePath into a key
- * TODO doc
- *
- * @param path
- * @return key (nodePath less the config root path)
- */
- private String pathToKey(String path) {
- if (StringUtils.isEmpty(path)) {
- return path;
- }
- String groupKey = path.replace(rootPath + PATH_SEPARATOR,
"").replaceAll(PATH_SEPARATOR, DOT_SEPARATOR);
- return groupKey.substring(groupKey.indexOf(DOT_SEPARATOR) + 1);
+ public ZookeeperDataListener getCachedListener(String pathKey) {
+ return pathKeyListeners.get(pathKey);
}
- private String getGroup(String path) {
- if (!StringUtils.isEmpty(path)) {
- int beginIndex = path.indexOf(rootPath + PATH_SEPARATOR);
- if (beginIndex > -1) {
- String remain = path.substring((rootPath +
PATH_SEPARATOR).length());
- int endIndex = remain.lastIndexOf(PATH_SEPARATOR);
- if (endIndex > -1) {
- return remain.substring(0, endIndex);
- }
- }
- }
- return path;
+ public Map<String, ZookeeperDataListener> getPathKeyListeners() {
+ return pathKeyListeners;
}
-
- @Override
- public void dataChanged(String path, Object value, EventType eventType) {
- ConfigChangeType changeType;
- if (EventType.NodeCreated.equals(eventType)) {
- changeType = ConfigChangeType.ADDED;
- } else if (value == null) {
- changeType = ConfigChangeType.DELETED;
- } else {
- changeType = ConfigChangeType.MODIFIED;
- }
- String key = pathToKey(path);
-
- ConfigChangedEvent configChangeEvent = new ConfigChangedEvent(key,
getGroup(path), (String) value, changeType);
- Set<ConfigurationListener> listeners = keyListeners.get(path);
- if (CollectionUtils.isNotEmpty(listeners)) {
- listeners.forEach(listener -> listener.process(configChangeEvent));
- }
+ public void clear() {
+ pathKeyListeners.clear();
}
}
diff --git
a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDataListener.java
b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDataListener.java
new file mode 100644
index 0000000000..c489ca0e33
--- /dev/null
+++
b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDataListener.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.configcenter.support.zookeeper;
+
+import org.apache.dubbo.common.config.configcenter.ConfigChangeType;
+import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent;
+import org.apache.dubbo.common.config.configcenter.ConfigurationListener;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.remoting.zookeeper.DataListener;
+import org.apache.dubbo.remoting.zookeeper.EventType;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * one path has multi configurationListeners
+ */
+public class ZookeeperDataListener implements DataListener {
+ private String path;
+ private String key;
+ private String group;
+ private Set<ConfigurationListener> listeners;
+
+ public ZookeeperDataListener(String path, String key, String group) {
+ this.path = path;
+ this.key = key;
+ this.group = group;
+ this.listeners = new CopyOnWriteArraySet<>();
+ }
+
+ public void addListener(ConfigurationListener configurationListener) {
+ listeners.add(configurationListener);
+ }
+
+ public void removeListener(ConfigurationListener configurationListener) {
+ listeners.remove(configurationListener);
+ }
+
+ public Set<ConfigurationListener> getListeners() {
+ return listeners;
+ }
+
+ @Override
+ public void dataChanged(String path, Object value, EventType eventType) {
+ if (!this.path.equals(path)) {
+ return;
+ }
+ ConfigChangeType changeType;
+ if (EventType.NodeCreated.equals(eventType)) {
+ changeType = ConfigChangeType.ADDED;
+ } else if (value == null) {
+ changeType = ConfigChangeType.DELETED;
+ } else {
+ changeType = ConfigChangeType.MODIFIED;
+ }
+ ConfigChangedEvent configChangeEvent = new ConfigChangedEvent(key,
group, (String) value, changeType);
+ if (CollectionUtils.isNotEmpty(listeners)) {
+ listeners.forEach(listener -> listener.process(configChangeEvent));
+ }
+ }
+
+}
diff --git
a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java
b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java
index b335184e18..b427709dc8 100644
---
a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java
+++
b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java
@@ -29,15 +29,12 @@ import
org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter;
import org.apache.zookeeper.data.Stat;
import java.util.Collection;
-import java.util.Set;
+import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-/**
- *
- */
public class ZookeeperDynamicConfiguration extends
TreePathDynamicConfiguration {
private Executor executor;
@@ -51,7 +48,7 @@ public class ZookeeperDynamicConfiguration extends
TreePathDynamicConfiguration
ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter
zookeeperTransporter) {
super(url);
- this.cacheListener = new CacheListener(rootPath);
+ this.cacheListener = new CacheListener();
final String threadName = this.getClass().getSimpleName();
this.executor = new
ThreadPoolExecutor(DEFAULT_ZK_EXECUTOR_THREADS_NUM,
DEFAULT_ZK_EXECUTOR_THREADS_NUM,
@@ -78,6 +75,13 @@ public class ZookeeperDynamicConfiguration extends
TreePathDynamicConfiguration
@Override
protected void doClose() throws Exception {
+ // remove data listener
+ Map<String, ZookeeperDataListener> pathKeyListeners =
cacheListener.getPathKeyListeners();
+ for (Map.Entry<String, ZookeeperDataListener> entry :
pathKeyListeners.entrySet()) {
+ zkClient.removeDataListener(entry.getKey(), entry.getValue());
+ }
+ cacheListener.clear();
+
// zkClient is shared in framework, should not close it here
// zkClient.close();
// See:
org.apache.dubbo.remoting.zookeeper.AbstractZookeeperTransporter#destroy()
@@ -129,17 +133,21 @@ public class ZookeeperDynamicConfiguration extends
TreePathDynamicConfiguration
}
@Override
- protected void doAddListener(String pathKey, ConfigurationListener
listener) {
- cacheListener.addListener(pathKey, listener);
- zkClient.addDataListener(pathKey, cacheListener, executor);
+ protected void doAddListener(String pathKey, ConfigurationListener
listener, String key, String group) {
+ ZookeeperDataListener cachedListener =
cacheListener.getCachedListener(pathKey);
+ if (cachedListener != null) {
+ cachedListener.addListener(listener);
+ } else {
+ ZookeeperDataListener addedListener =
cacheListener.addListener(pathKey, listener, key, group);
+ zkClient.addDataListener(pathKey, addedListener, executor);
+ }
}
@Override
protected void doRemoveListener(String pathKey, ConfigurationListener
listener) {
- cacheListener.removeListener(pathKey, listener);
- Set<ConfigurationListener> configurationListeners =
cacheListener.getConfigurationListeners(pathKey);
- if (CollectionUtils.isEmpty(configurationListeners)) {
- zkClient.removeDataListener(pathKey, cacheListener);
+ ZookeeperDataListener zookeeperDataListener =
cacheListener.removeListener(pathKey, listener);
+ if (CollectionUtils.isEmpty(zookeeperDataListener.getListeners())) {
+ zkClient.removeDataListener(pathKey, zookeeperDataListener);
}
}
}
diff --git
a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java
b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java
index 309cf43faf..962c7bd028 100644
---
a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java
+++
b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java
@@ -19,13 +19,9 @@ package org.apache.dubbo.configcenter.support.zookeeper;
import org.apache.dubbo.common.URL;
import
org.apache.dubbo.common.config.configcenter.AbstractDynamicConfigurationFactory;
import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
-import org.apache.dubbo.common.extension.DisableInject;
import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter;
import org.apache.dubbo.rpc.model.ApplicationModel;
-/**
- *
- */
public class ZookeeperDynamicConfigurationFactory extends
AbstractDynamicConfigurationFactory {
private ZookeeperTransporter zookeeperTransporter;
@@ -37,11 +33,6 @@ public class ZookeeperDynamicConfigurationFactory extends
AbstractDynamicConfigu
this.zookeeperTransporter =
ZookeeperTransporter.getExtension(applicationModel);
}
- @DisableInject
- public void setZookeeperTransporter(ZookeeperTransporter
zookeeperTransporter) {
- this.zookeeperTransporter = zookeeperTransporter;
- }
-
@Override
protected DynamicConfiguration createDynamicConfiguration(URL url) {
return new ZookeeperDynamicConfiguration(url, zookeeperTransporter);