This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-spi-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 504f71f Enhance configcenter & metadata-report modules (#133)
504f71f is described below
commit 504f71fb9921c14cbe327b21bbcb2bcae51e1b1c
Author: Albumen Kevin <[email protected]>
AuthorDate: Mon Jul 18 18:46:21 2022 +0800
Enhance configcenter & metadata-report modules (#133)
* add consul configcenter
* add etcd configcenter
* opt pom
* fix compile
* add consul metadata report
* add etcd metadata report
* opt pom
---
.../dubbo-configcenter-consul/pom.xml | 48 ++++
.../dubbo/configcenter/consul/ConsulConstants.java | 34 +++
.../consul/ConsulDynamicConfiguration.java | 177 ++++++++++++++
.../consul/ConsulDynamicConfigurationFactory.java | 32 +++
...config.configcenter.DynamicConfigurationFactory | 1 +
.../consul/ConsulDynamicConfigurationTest.java | 123 ++++++++++
.../dubbo-configcenter-etcd/pom.xml | 74 ++++++
.../support/etcd/EtcdDynamicConfiguration.java | 197 ++++++++++++++++
.../etcd/EtcdDynamicConfigurationFactory.java | 33 +++
...config.configcenter.DynamicConfigurationFactory | 1 +
.../support/etcd/EtcdDynamicConfigurationTest.java | 154 ++++++++++++
dubbo-configcenter-extensions/pom.xml | 5 +
dubbo-extensions-dependencies-bom/pom.xml | 25 ++
.../dubbo-metadata-report-consul/pom.xml | 49 ++++
.../store/consul/ConsulMetadataReport.java | 133 +++++++++++
.../store/consul/ConsulMetadataReportFactory.java | 32 +++
...che.dubbo.metadata.report.MetadataReportFactory | 1 +
.../dubbo-metadata-report-etcd/pom.xml | 71 ++++++
.../metadata/store/etcd/EtcdMetadataReport.java | 146 ++++++++++++
.../store/etcd/EtcdMetadataReportFactory.java | 50 ++++
...che.dubbo.metadata.report.MetadataReportFactory | 1 +
.../store/etcd/EtcdMetadata4TstService.java | 28 +++
.../store/etcd/EtcdMetadataReportTest.java | 261 +++++++++++++++++++++
dubbo-metadata-report-extensions/pom.xml | 6 +
24 files changed, 1682 insertions(+)
diff --git a/dubbo-configcenter-extensions/dubbo-configcenter-consul/pom.xml
b/dubbo-configcenter-extensions/dubbo-configcenter-consul/pom.xml
new file mode 100644
index 0000000..e8f1e6b
--- /dev/null
+++ b/dubbo-configcenter-extensions/dubbo-configcenter-consul/pom.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>dubbo-configcenter-extensions</artifactId>
+ <groupId>org.apache.dubbo.extensions</groupId>
+ <version>${revision}</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>dubbo-configcenter-consul</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.orbitz.consul</groupId>
+ <artifactId>consul-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.pszymczyk.consul</groupId>
+ <artifactId>embedded-consul</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+
+</project>
diff --git
a/dubbo-configcenter-extensions/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulConstants.java
b/dubbo-configcenter-extensions/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulConstants.java
new file mode 100644
index 0000000..3a8f0db
--- /dev/null
+++
b/dubbo-configcenter-extensions/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulConstants.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.configcenter.consul;
+
+/**
+ * Common configuration for configCenter, metadata, and registry modules
+ */
+public interface ConsulConstants {
+
+ int DEFAULT_PORT = 8500;
+
+ int DEFAULT_WATCH_TIMEOUT = 60 * 1000;
+
+ String WATCH_TIMEOUT = "consul-watch-timeout";
+
+ int INVALID_PORT = 0;
+
+ String TOKEN = "token";
+}
diff --git
a/dubbo-configcenter-extensions/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java
b/dubbo-configcenter-extensions/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java
new file mode 100644
index 0000000..b82aa8a
--- /dev/null
+++
b/dubbo-configcenter-extensions/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.configcenter.consul;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.configcenter.ConfigChangeType;
+import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent;
+import org.apache.dubbo.common.config.configcenter.ConfigurationListener;
+import
org.apache.dubbo.common.config.configcenter.TreePathDynamicConfiguration;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.StringUtils;
+
+import com.google.common.base.Charsets;
+import com.google.common.net.HostAndPort;
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.KeyValueClient;
+import com.orbitz.consul.cache.KVCache;
+import com.orbitz.consul.model.kv.Value;
+
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
+
+/**
+ * config center implementation for consul
+ */
+public class ConsulDynamicConfiguration extends TreePathDynamicConfiguration {
+ private static final Logger logger =
LoggerFactory.getLogger(ConsulDynamicConfiguration.class);
+
+ private final Consul client;
+
+ private final KeyValueClient kvClient;
+
+ private final int watchTimeout;
+
+ private final ConcurrentMap<String, ConsulListener> watchers = new
ConcurrentHashMap<>();
+
+ public ConsulDynamicConfiguration(URL url) {
+ super(url);
+ watchTimeout = url.getParameter(ConsulConstants.WATCH_TIMEOUT,
ConsulConstants.DEFAULT_WATCH_TIMEOUT);
+ String host = url.getHost();
+ int port = ConsulConstants.INVALID_PORT != url.getPort() ?
url.getPort() : ConsulConstants.DEFAULT_PORT;
+ Consul.Builder builder = Consul.builder()
+ .withHostAndPort(HostAndPort.fromParts(host, port));
+ String token = url.getParameter(ConsulConstants.TOKEN, (String) null);
+ if (StringUtils.isNotEmpty(token)) {
+ builder.withAclToken(token);
+ }
+ client = builder.build();
+ this.kvClient = client.keyValueClient();
+ }
+
+ @Override
+ public String getInternalProperty(String key) {
+ logger.info("getting config from: " + key);
+ return kvClient.getValueAsString(key, Charsets.UTF_8).orElse(null);
+ }
+
+ @Override
+ protected boolean doPublishConfig(String pathKey, String content) throws
Exception {
+ return kvClient.putValue(pathKey, content);
+ }
+
+ @Override
+ protected String doGetConfig(String pathKey) throws Exception {
+ return getInternalProperty(pathKey);
+ }
+
+ @Override
+ protected boolean doRemoveConfig(String pathKey) throws Exception {
+ kvClient.deleteKey(pathKey);
+ return true;
+ }
+
+ @Override
+ protected Collection<String> doGetConfigKeys(String groupPath) {
+ List<String> keys = kvClient.getKeys(groupPath);
+ List<String> configKeys = new LinkedList<>();
+ if (CollectionUtils.isNotEmpty(keys)) {
+ keys.stream()
+ .filter(k -> !k.equals(groupPath))
+ .map(k -> k.substring(k.lastIndexOf(PATH_SEPARATOR) + 1))
+ .forEach(configKeys::add);
+ }
+ return configKeys;
+ }
+
+ @Override
+ protected void doAddListener(String pathKey, ConfigurationListener
listener) {
+ logger.info("register listener " + listener.getClass() + " for config
with key: " + pathKey);
+ ConsulListener watcher = watchers.computeIfAbsent(pathKey, k -> new
ConsulListener(pathKey));
+ watcher.addListener(listener);
+ }
+
+ @Override
+ protected void doRemoveListener(String pathKey, ConfigurationListener
listener) {
+ logger.info("unregister listener " + listener.getClass() + " for
config with key: " + pathKey);
+ ConsulListener watcher = watchers.get(pathKey);
+ if (watcher != null) {
+ watcher.removeListener(listener);
+ }
+ }
+
+ @Override
+ protected void doClose() throws Exception {
+ client.destroy();
+ }
+
+ private class ConsulListener implements KVCache.Listener<String, Value> {
+
+ private KVCache kvCache;
+ private final Set<ConfigurationListener> listeners = new
LinkedHashSet<>();
+ private final String normalizedKey;
+
+ public ConsulListener(String normalizedKey) {
+ this.normalizedKey = normalizedKey;
+ initKVCache();
+ }
+
+ private void initKVCache() {
+ this.kvCache = KVCache.newCache(kvClient, normalizedKey,
watchTimeout);
+ kvCache.addListener(this);
+ kvCache.start();
+ }
+
+ @Override
+ public void notify(Map<String, Value> newValues) {
+ // Cache notifies all paths with "foo" the root path
+ // If you want to watch only "foo" value, you must filter other
paths
+ Optional<Value> newValue = newValues.values().stream()
+ .filter(value -> value.getKey().equals(normalizedKey))
+ .findAny();
+
+ newValue.ifPresent(value -> {
+ // Values are encoded in key/value store, decode it if needed
+ Optional<String> decodedValue =
newValue.get().getValueAsString();
+ decodedValue.ifPresent(v -> listeners.forEach(l -> {
+ ConfigChangedEvent event = new
ConfigChangedEvent(normalizedKey, getGroup(), v, ConfigChangeType.MODIFIED);
+ l.process(event);
+ }));
+ });
+ }
+
+ private void addListener(ConfigurationListener listener) {
+ this.listeners.add(listener);
+ }
+
+ private void removeListener(ConfigurationListener listener) {
+ this.listeners.remove(listener);
+ }
+ }
+}
diff --git
a/dubbo-configcenter-extensions/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfigurationFactory.java
b/dubbo-configcenter-extensions/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfigurationFactory.java
new file mode 100644
index 0000000..980a156
--- /dev/null
+++
b/dubbo-configcenter-extensions/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfigurationFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.configcenter.consul;
+
+import org.apache.dubbo.common.URL;
+import
org.apache.dubbo.common.config.configcenter.AbstractDynamicConfigurationFactory;
+import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
+
+/**
+ * Config center factory for consul
+ */
+public class ConsulDynamicConfigurationFactory extends
AbstractDynamicConfigurationFactory {
+ @Override
+ protected DynamicConfiguration createDynamicConfiguration(URL url) {
+ return new ConsulDynamicConfiguration(url);
+ }
+}
diff --git
a/dubbo-configcenter-extensions/dubbo-configcenter-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.config.configcenter.DynamicConfigurationFactory
b/dubbo-configcenter-extensions/dubbo-configcenter-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.config.configcenter.DynamicConfigurationFactory
new file mode 100644
index 0000000..b7a5091
--- /dev/null
+++
b/dubbo-configcenter-extensions/dubbo-configcenter-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.config.configcenter.DynamicConfigurationFactory
@@ -0,0 +1 @@
+consul=org.apache.dubbo.configcenter.consul.ConsulDynamicConfigurationFactory
diff --git
a/dubbo-configcenter-extensions/dubbo-configcenter-consul/src/test/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfigurationTest.java
b/dubbo-configcenter-extensions/dubbo-configcenter-consul/src/test/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfigurationTest.java
new file mode 100644
index 0000000..60112b9
--- /dev/null
+++
b/dubbo-configcenter-extensions/dubbo-configcenter-consul/src/test/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfigurationTest.java
@@ -0,0 +1,123 @@
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//package org.apache.dubbo.configcenter.consul;
+//
+//import org.apache.dubbo.common.URL;
+//
+//import com.google.common.net.HostAndPort;
+//import com.orbitz.consul.Consul;
+//import com.orbitz.consul.KeyValueClient;
+//import com.orbitz.consul.cache.KVCache;
+//import com.orbitz.consul.model.kv.Value;
+//import com.pszymczyk.consul.ConsulProcess;
+//import com.pszymczyk.consul.ConsulStarterBuilder;
+//import org.junit.jupiter.api.AfterAll;
+//import org.junit.jupiter.api.Assertions;
+//import org.junit.jupiter.api.BeforeAll;
+//import org.junit.jupiter.api.Test;
+//
+//import java.util.Arrays;
+//import java.util.Optional;
+//import java.util.TreeSet;
+//
+//import static org.junit.jupiter.api.Assertions.assertEquals;
+//
+///**
+// *
+// */
+//public class ConsulDynamicConfigurationTest {
+//
+// private static ConsulProcess consul;
+// private static URL configCenterUrl;
+// private static ConsulDynamicConfiguration configuration;
+//
+// private static Consul client;
+// private static KeyValueClient kvClient;
+//
+// @BeforeAll
+// public static void setUp() throws Exception {
+// consul = ConsulStarterBuilder.consulStarter()
+// .build()
+// .start();
+// configCenterUrl = URL.valueOf("consul://127.0.0.1:" +
consul.getHttpPort());
+//
+// configuration = new ConsulDynamicConfiguration(configCenterUrl);
+// client =
Consul.builder().withHostAndPort(HostAndPort.fromParts("127.0.0.1",
consul.getHttpPort())).build();
+// kvClient = client.keyValueClient();
+// }
+//
+// @AfterAll
+// public static void tearDown() throws Exception {
+// consul.close();
+// configuration.close();
+// }
+//
+// @Test
+// public void testGetConfig() {
+// kvClient.putValue("/dubbo/config/dubbo/foo", "bar");
+// // test equals
+// assertEquals("bar", configuration.getConfig("foo", "dubbo"));
+// // test does not block
+// assertEquals("bar", configuration.getConfig("foo", "dubbo"));
+// Assertions.assertNull(configuration.getConfig("not-exist", "dubbo"));
+// }
+//
+// @Test
+// public void testPublishConfig() {
+// configuration.publishConfig("value", "metadata", "1");
+// // test equals
+// assertEquals("1", configuration.getConfig("value", "/metadata"));
+// assertEquals("1",
kvClient.getValueAsString("/dubbo/config/metadata/value").get());
+// }
+//
+// @Test
+// public void testAddListener() {
+// KVCache cache = KVCache.newCache(kvClient,
"/dubbo/config/dubbo/foo");
+// cache.addListener(newValues -> {
+// // Cache notifies all paths with "foo" the root path
+// // If you want to watch only "foo" value, you must filter other
paths
+// Optional<Value> newValue = newValues.values().stream()
+// .filter(value -> value.getKey().equals("foo"))
+// .findAny();
+//
+// newValue.ifPresent(value -> {
+// // Values are encoded in key/value store, decode it if needed
+// Optional<String> decodedValue =
newValue.get().getValueAsString();
+// decodedValue.ifPresent(v ->
System.out.println(String.format("Value is: %s", v))); //prints "bar"
+// });
+// });
+// cache.start();
+//
+// kvClient.putValue("/dubbo/config/dubbo/foo", "new-value");
+// kvClient.putValue("/dubbo/config/dubbo/foo/sub", "sub-value");
+// kvClient.putValue("/dubbo/config/dubbo/foo/sub2", "sub-value2");
+// kvClient.putValue("/dubbo/config/foo", "parent-value");
+//
+// System.out.println(kvClient.getKeys("/dubbo/config/dubbo/foo"));
+// System.out.println(kvClient.getKeys("/dubbo/config"));
+// System.out.println(kvClient.getValues("/dubbo/config/dubbo/foo"));
+// }
+//
+// @Test
+// public void testGetConfigKeys() {
+// configuration.publishConfig("v1", "metadata", "1");
+// configuration.publishConfig("v2", "metadata", "2");
+// configuration.publishConfig("v3", "metadata", "3");
+// // test equals
+// assertEquals(new TreeSet(Arrays.asList("v1", "v2", "v3")),
configuration.getConfigKeys("metadata"));
+// }
+//}
diff --git a/dubbo-configcenter-extensions/dubbo-configcenter-etcd/pom.xml
b/dubbo-configcenter-extensions/dubbo-configcenter-etcd/pom.xml
new file mode 100644
index 0000000..fd5aeb6
--- /dev/null
+++ b/dubbo-configcenter-extensions/dubbo-configcenter-etcd/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>dubbo-configcenter-extensions</artifactId>
+ <groupId>org.apache.dubbo.extensions</groupId>
+ <version>${revision}</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>dubbo-configcenter-etcd</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+ <name>${project.artifactId}</name>
+ <description>The etcd implementation of the config-center api</description>
+
+ <properties>
+ <skipIntegrationTests>true</skipIntegrationTests>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-launcher</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo.extensions</groupId>
+ <artifactId>dubbo-remoting-etcd3</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skipTests>${skipIntegrationTests}</skipTests>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/dubbo-configcenter-extensions/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java
b/dubbo-configcenter-extensions/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java
new file mode 100644
index 0000000..59b384b
--- /dev/null
+++
b/dubbo-configcenter-extensions/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.configcenter.support.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.configcenter.ConfigChangeType;
+import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent;
+import org.apache.dubbo.common.config.configcenter.ConfigurationListener;
+import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.jetcd.JEtcdClient;
+
+import com.google.protobuf.ByteString;
+import io.etcd.jetcd.api.Event;
+import io.etcd.jetcd.api.WatchCancelRequest;
+import io.etcd.jetcd.api.WatchCreateRequest;
+import io.etcd.jetcd.api.WatchGrpc;
+import io.etcd.jetcd.api.WatchRequest;
+import io.etcd.jetcd.api.WatchResponse;
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.dubbo.common.constants.CommonConstants.CONFIG_NAMESPACE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
+
+/**
+ * The etcd implementation of {@link DynamicConfiguration}
+ */
+public class EtcdDynamicConfiguration implements DynamicConfiguration {
+
+ /**
+ * The final root path would be: /$NAME_SPACE/config
+ */
+ private String rootPath;
+
+ /**
+ * The etcd client
+ */
+ private final JEtcdClient etcdClient;
+
+ /**
+ * The map store the key to {@link EtcdConfigWatcher} mapping
+ */
+ private final ConcurrentMap<ConfigurationListener, EtcdConfigWatcher>
watchListenerMap;
+
+ EtcdDynamicConfiguration(URL url) {
+ rootPath = PATH_SEPARATOR + url.getParameter(CONFIG_NAMESPACE_KEY,
DEFAULT_GROUP) + "/config";
+ etcdClient = new JEtcdClient(url);
+ etcdClient.addStateListener(state -> {
+ if (state == StateListener.CONNECTED) {
+ try {
+ recover();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ });
+ watchListenerMap = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void addListener(String key, String group, ConfigurationListener
listener) {
+ if (watchListenerMap.get(listener) == null) {
+ EtcdConfigWatcher watcher = new EtcdConfigWatcher(key, group,
listener);
+ watchListenerMap.put(listener, watcher);
+ watcher.watch();
+ }
+ }
+
+ @Override
+ public void removeListener(String key, String group, ConfigurationListener
listener) {
+ EtcdConfigWatcher watcher = watchListenerMap.get(listener);
+ watcher.cancelWatch();
+ }
+
+ @Override
+ public String getConfig(String key, String group, long timeout) throws
IllegalStateException {
+ return (String) getInternalProperty(convertKey(group, key));
+ }
+
+// @Override
+// public String getConfigs(String key, String group, long timeout) throws
IllegalStateException {
+// if (StringUtils.isEmpty(group)) {
+// group = DEFAULT_GROUP;
+// }
+// return (String) getInternalProperty(convertKey(group, key));
+// }
+
+ @Override
+ public Object getInternalProperty(String key) {
+ return etcdClient.getKVValue(key);
+ }
+
+ private String buildPath(String group) {
+ String actualGroup = StringUtils.isEmpty(group) ? DEFAULT_GROUP :
group;
+ return rootPath + PATH_SEPARATOR + actualGroup;
+ }
+
+ private String convertKey(String group, String key) {
+ return buildPath(group) + PATH_SEPARATOR + key;
+ }
+
+ private void recover() {
+ for (EtcdConfigWatcher watcher : watchListenerMap.values()) {
+ watcher.watch();
+ }
+ }
+
+ public class EtcdConfigWatcher implements StreamObserver<WatchResponse> {
+
+ private ConfigurationListener listener;
+ protected WatchGrpc.WatchStub watchStub;
+ private StreamObserver<WatchRequest> observer;
+ protected long watchId;
+ private ManagedChannel channel;
+
+ private final String key;
+
+ private final String group;
+
+ private String normalizedKey;
+
+ public EtcdConfigWatcher(String key, String group,
ConfigurationListener listener) {
+ this.key = key;
+ this.group = group;
+ this.normalizedKey = convertKey(group, key);
+ this.listener = listener;
+ this.channel = etcdClient.getChannel();
+ }
+
+ @Override
+ public void onNext(WatchResponse watchResponse) {
+ this.watchId = watchResponse.getWatchId();
+ for (Event etcdEvent : watchResponse.getEventsList()) {
+ ConfigChangeType type = ConfigChangeType.MODIFIED;
+ if (etcdEvent.getType() == Event.EventType.DELETE) {
+ type = ConfigChangeType.DELETED;
+ }
+ ConfigChangedEvent event = new ConfigChangedEvent(key, group,
+ etcdEvent.getKv().getValue().toString(UTF_8), type);
+ listener.process(event);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ // ignore
+ }
+
+ @Override
+ public void onCompleted() {
+ // ignore
+ }
+
+ public long getWatchId() {
+ return watchId;
+ }
+
+ private void watch() {
+ watchStub = WatchGrpc.newStub(channel);
+ observer = watchStub.watch(this);
+ WatchCreateRequest.Builder builder =
WatchCreateRequest.newBuilder()
+ .setKey(ByteString.copyFromUtf8(normalizedKey))
+ .setProgressNotify(true);
+ WatchRequest req =
WatchRequest.newBuilder().setCreateRequest(builder).build();
+ observer.onNext(req);
+ }
+
+ private void cancelWatch() {
+ WatchCancelRequest watchCancelRequest =
+
WatchCancelRequest.newBuilder().setWatchId(watchId).build();
+ WatchRequest cancelRequest = WatchRequest.newBuilder()
+ .setCancelRequest(watchCancelRequest).build();
+ observer.onNext(cancelRequest);
+ }
+ }
+}
diff --git
a/dubbo-configcenter-extensions/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java
b/dubbo-configcenter-extensions/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java
new file mode 100644
index 0000000..269cee6
--- /dev/null
+++
b/dubbo-configcenter-extensions/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.configcenter.support.etcd;
+
+import org.apache.dubbo.common.URL;
+import
org.apache.dubbo.common.config.configcenter.AbstractDynamicConfigurationFactory;
+import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
+
+/**
+ * The etcd implementation of {@link AbstractDynamicConfigurationFactory}
+ */
+public class EtcdDynamicConfigurationFactory extends
AbstractDynamicConfigurationFactory {
+
+ @Override
+ protected DynamicConfiguration createDynamicConfiguration(URL url) {
+ return new EtcdDynamicConfiguration(url);
+ }
+}
diff --git
a/dubbo-configcenter-extensions/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.config.configcenter.DynamicConfigurationFactory
b/dubbo-configcenter-extensions/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.config.configcenter.DynamicConfigurationFactory
new file mode 100644
index 0000000..8fbb5a5
--- /dev/null
+++
b/dubbo-configcenter-extensions/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.config.configcenter.DynamicConfigurationFactory
@@ -0,0 +1 @@
+etcd=org.apache.dubbo.configcenter.support.etcd.EtcdDynamicConfigurationFactory
diff --git
a/dubbo-configcenter-extensions/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java
b/dubbo-configcenter-extensions/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java
new file mode 100644
index 0000000..e6ed4b5
--- /dev/null
+++
b/dubbo-configcenter-extensions/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.configcenter.support.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent;
+import org.apache.dubbo.common.config.configcenter.ConfigurationListener;
+import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
+
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.launcher.EtcdCluster;
+import io.etcd.jetcd.launcher.EtcdClusterFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.dubbo.remoting.etcd.Constants.SESSION_TIMEOUT_KEY;
+
+/**
+ * Unit test for etcd config center support
+ * Integrate with https://github.com/etcd-io/jetcd#launcher
+ */
+public class EtcdDynamicConfigurationTest {
+
+ private static EtcdDynamicConfiguration config;
+
+ public EtcdCluster etcdCluster =
EtcdClusterFactory.buildCluster(getClass().getSimpleName(), 3, false);
+
+ private static Client client;
+
+ @Test
+ public void testGetConfig() {
+
+ put("/dubbo/config/org.apache.dubbo.etcd.testService/configurators",
"hello");
+ put("/dubbo/config/test/dubbo.properties", "aaa=bbb");
+ Assert.assertEquals("hello",
config.getConfig("org.apache.dubbo.etcd.testService.configurators",
DynamicConfiguration.DEFAULT_GROUP));
+ Assert.assertEquals("aaa=bbb", config.getConfig("dubbo.properties",
"test"));
+ }
+
+ @Test
+ public void testAddListener() throws Exception {
+ CountDownLatch latch = new CountDownLatch(4);
+ TestListener listener1 = new TestListener(latch);
+ TestListener listener2 = new TestListener(latch);
+ TestListener listener3 = new TestListener(latch);
+ TestListener listener4 = new TestListener(latch);
+ config.addListener("AService.configurators", listener1);
+ config.addListener("AService.configurators", listener2);
+ config.addListener("testapp.tagrouters", listener3);
+ config.addListener("testapp.tagrouters", listener4);
+
+ put("/dubbo/config/AService/configurators", "new value1");
+ Thread.sleep(200);
+ put("/dubbo/config/testapp/tagrouters", "new value2");
+ Thread.sleep(200);
+ put("/dubbo/config/testapp", "new value3");
+
+ Thread.sleep(1000);
+
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ Assert.assertEquals(1,
listener1.getCount("/dubbo/config/AService/configurators"));
+ Assert.assertEquals(1,
listener2.getCount("/dubbo/config/AService/configurators"));
+ Assert.assertEquals(1,
listener3.getCount("/dubbo/config/testapp/tagrouters"));
+ Assert.assertEquals(1,
listener4.getCount("/dubbo/config/testapp/tagrouters"));
+
+ Assert.assertEquals("new value1", listener1.getValue());
+ Assert.assertEquals("new value1", listener2.getValue());
+ Assert.assertEquals("new value2", listener3.getValue());
+ Assert.assertEquals("new value2", listener4.getValue());
+ }
+
+ private class TestListener implements ConfigurationListener {
+ private CountDownLatch latch;
+ private String value;
+ private Map<String, Integer> countMap = new HashMap<>();
+
+ public TestListener(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ @Override
+ public void process(ConfigChangedEvent event) {
+ Integer count = countMap.computeIfAbsent(event.getKey(), k -> 0);
+ countMap.put(event.getKey(), ++count);
+ value = event.getContent();
+ latch.countDown();
+ }
+
+ public int getCount(String key) {
+ return countMap.get(key);
+ }
+
+ public String getValue() {
+ return value;
+ }
+ }
+
+ private void put(String key, String value) {
+ try {
+ client.getKVClient().put(ByteSequence.from(key, UTF_8),
ByteSequence.from(value, UTF_8)).get();
+ } catch (Exception e) {
+ System.out.println("Error put value to etcd.");
+ }
+ }
+
+ @Before
+ public void setUp() {
+
+ etcdCluster.start();
+
+ client =
Client.builder().endpoints(etcdCluster.getClientEndpoints()).build();
+
+ List<URI> clientEndPoints = etcdCluster.getClientEndpoints();
+
+ String ipAddress = clientEndPoints.get(0).getHost() + ":" +
clientEndPoints.get(0).getPort();
+ String urlForDubbo = "etcd3://" + ipAddress +
"/org.apache.dubbo.etcd.testService";
+
+ // timeout in 15 seconds.
+ URL url = URL.valueOf(urlForDubbo)
+ .addParameter(SESSION_TIMEOUT_KEY, 15000);
+ config = new EtcdDynamicConfiguration(url);
+ }
+
+ @After
+ public void tearDown() {
+ etcdCluster.close();
+ }
+
+}
diff --git a/dubbo-configcenter-extensions/pom.xml
b/dubbo-configcenter-extensions/pom.xml
index d0cdcfb..ea79939 100644
--- a/dubbo-configcenter-extensions/pom.xml
+++ b/dubbo-configcenter-extensions/pom.xml
@@ -27,6 +27,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>dubbo-configcenter-extensions</artifactId>
+ <packaging>pom</packaging>
<version>${revision}</version>
+ <modules>
+ <module>dubbo-configcenter-consul</module>
+ <module>dubbo-configcenter-etcd</module>
+ </modules>
</project>
diff --git a/dubbo-extensions-dependencies-bom/pom.xml
b/dubbo-extensions-dependencies-bom/pom.xml
index 977804d..f6babb8 100644
--- a/dubbo-extensions-dependencies-bom/pom.xml
+++ b/dubbo-extensions-dependencies-bom/pom.xml
@@ -123,6 +123,10 @@
<grpc.version>1.31.1</grpc.version>
<etcd_launcher_version>0.5.7</etcd_launcher_version>
<netty4_version>4.1.66.Final</netty4_version>
+ <consul_process_version>2.2.1</consul_process_version>
+ <consul_version>1.4.2</consul_version>
+ <consul_client_version>1.3.7</consul_client_version>
+ <test_container_version>1.15.2</test_container_version>
<maven_flatten_version>1.2.5</maven_flatten_version>
</properties>
@@ -396,6 +400,27 @@
<artifactId>netty-all</artifactId>
<version>${netty4_version}</version>
</dependency>
+ <dependency>
+ <groupId>com.pszymczyk.consul</groupId>
+ <artifactId>embedded-consul</artifactId>
+ <version>${consul_process_version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.ecwid.consul</groupId>
+ <artifactId>consul-api</artifactId>
+ <version>${consul_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.orbitz.consul</groupId>
+ <artifactId>consul-client</artifactId>
+ <version>${consul_client_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>${test_container_version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git
a/dubbo-metadata-report-extensions/dubbo-metadata-report-consul/pom.xml
b/dubbo-metadata-report-extensions/dubbo-metadata-report-consul/pom.xml
new file mode 100644
index 0000000..fdd5b0b
--- /dev/null
+++ b/dubbo-metadata-report-extensions/dubbo-metadata-report-consul/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>dubbo-metadata-report-extensions</artifactId>
+ <groupId>org.apache.dubbo.extensions</groupId>
+ <version>${revision}</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <version>1.0.0-SNAPSHOT</version>
+
+ <artifactId>dubbo-metadata-report-consul</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-metadata-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo.extensions</groupId>
+ <artifactId>dubbo-configcenter-consul</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>com.ecwid.consul</groupId>
+ <artifactId>consul-api</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/dubbo-metadata-report-extensions/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReport.java
b/dubbo-metadata-report-extensions/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReport.java
new file mode 100644
index 0000000..e678bd4
--- /dev/null
+++
b/dubbo-metadata-report-extensions/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReport.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metadata.store.consul;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.configcenter.consul.ConsulConstants;
+import org.apache.dubbo.metadata.report.identifier.BaseMetadataIdentifier;
+import org.apache.dubbo.metadata.report.identifier.KeyTypeEnum;
+import org.apache.dubbo.metadata.report.identifier.MetadataIdentifier;
+import org.apache.dubbo.metadata.report.identifier.ServiceMetadataIdentifier;
+import
org.apache.dubbo.metadata.report.identifier.SubscriberMetadataIdentifier;
+import org.apache.dubbo.metadata.report.support.AbstractMetadataReport;
+import org.apache.dubbo.rpc.RpcException;
+
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.kv.model.GetValue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+
+/**
+ * metadata report impl for consul
+ */
+public class ConsulMetadataReport extends AbstractMetadataReport {
+
+ private ConsulClient client;
+
+ public ConsulMetadataReport(URL url) {
+ super(url);
+
+ String host = url.getHost();
+ int port = ConsulConstants.INVALID_PORT != url.getPort() ?
url.getPort() : ConsulConstants.DEFAULT_PORT;
+ client = new ConsulClient(host, port);
+ }
+
+ @Override
+ protected void doStoreProviderMetadata(MetadataIdentifier
providerMetadataIdentifier, String serviceDefinitions) {
+ this.storeMetadata(providerMetadataIdentifier, serviceDefinitions);
+ }
+
+ @Override
+ protected void doStoreConsumerMetadata(MetadataIdentifier
consumerMetadataIdentifier, String value) {
+ this.storeMetadata(consumerMetadataIdentifier, value);
+ }
+
+ @Override
+ protected void doSaveMetadata(ServiceMetadataIdentifier
serviceMetadataIdentifier, URL url) {
+ this.storeMetadata(serviceMetadataIdentifier,
URL.encode(url.toFullString()));
+ }
+
+ @Override
+ protected void doRemoveMetadata(ServiceMetadataIdentifier
serviceMetadataIdentifier) {
+ this.deleteMetadata(serviceMetadataIdentifier);
+ }
+
+ @Override
+ protected List<String> doGetExportedURLs(ServiceMetadataIdentifier
metadataIdentifier) {
+ //todo encode and decode
+ String content = getMetadata(metadataIdentifier);
+ if (StringUtils.isEmpty(content)) {
+ return Collections.emptyList();
+ }
+ return new ArrayList<String>(Arrays.asList(URL.decode(content)));
+ }
+
+ @Override
+ protected void doSaveSubscriberData(SubscriberMetadataIdentifier
subscriberMetadataIdentifier, String urlListStr) {
+ this.storeMetadata(subscriberMetadataIdentifier, urlListStr);
+ }
+
+ @Override
+ protected String doGetSubscribedURLs(SubscriberMetadataIdentifier
subscriberMetadataIdentifier) {
+ return getMetadata(subscriberMetadataIdentifier);
+ }
+
+ private void storeMetadata(BaseMetadataIdentifier identifier, String v) {
+ try {
+ client.setKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY),
v);
+ } catch (Throwable t) {
+ logger.error("Failed to put " + identifier + " to consul " + v +
", cause: " + t.getMessage(), t);
+ throw new RpcException("Failed to put " + identifier + " to consul
" + v + ", cause: " + t.getMessage(), t);
+ }
+ }
+
+ private void deleteMetadata(BaseMetadataIdentifier identifier) {
+ try {
+
client.deleteKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
+ } catch (Throwable t) {
+ logger.error("Failed to delete " + identifier + " from consul ,
cause: " + t.getMessage(), t);
+ throw new RpcException("Failed to delete " + identifier + " from
consul , cause: " + t.getMessage(), t);
+ }
+ }
+
+ private String getMetadata(BaseMetadataIdentifier identifier) {
+ try {
+ Response<GetValue> value =
client.getKVValue(identifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
+ //FIXME CHECK
+ if (value != null && value.getValue() != null) {
+ //todo check decode value and value diff
+ return value.getValue().getValue();
+ }
+ return null;
+ } catch (Throwable t) {
+ logger.error("Failed to get " + identifier + " from consul ,
cause: " + t.getMessage(), t);
+ throw new RpcException("Failed to get " + identifier + " from
consul , cause: " + t.getMessage(), t);
+ }
+ }
+
+ @Override
+ public String getServiceDefinition(MetadataIdentifier metadataIdentifier) {
+ return getMetadata(metadataIdentifier);
+ }
+}
diff --git
a/dubbo-metadata-report-extensions/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReportFactory.java
b/dubbo-metadata-report-extensions/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReportFactory.java
new file mode 100644
index 0000000..1d1f5bb
--- /dev/null
+++
b/dubbo-metadata-report-extensions/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReportFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metadata.store.consul;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.metadata.report.MetadataReport;
+import org.apache.dubbo.metadata.report.support.AbstractMetadataReportFactory;
+
+/**
+ * metadata report factory impl for consul
+ */
+public class ConsulMetadataReportFactory extends AbstractMetadataReportFactory
{
+ @Override
+ protected MetadataReport createMetadataReport(URL url) {
+ return new ConsulMetadataReport(url);
+ }
+}
diff --git
a/dubbo-metadata-report-extensions/dubbo-metadata-report-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.report.MetadataReportFactory
b/dubbo-metadata-report-extensions/dubbo-metadata-report-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.report.MetadataReportFactory
new file mode 100644
index 0000000..1f27535
--- /dev/null
+++
b/dubbo-metadata-report-extensions/dubbo-metadata-report-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.report.MetadataReportFactory
@@ -0,0 +1 @@
+consul=org.apache.dubbo.metadata.store.consul.ConsulMetadataReportFactory
diff --git
a/dubbo-metadata-report-extensions/dubbo-metadata-report-etcd/pom.xml
b/dubbo-metadata-report-extensions/dubbo-metadata-report-etcd/pom.xml
new file mode 100644
index 0000000..d81b751
--- /dev/null
+++ b/dubbo-metadata-report-extensions/dubbo-metadata-report-etcd/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>dubbo-metadata-report-extensions</artifactId>
+ <groupId>org.apache.dubbo.extensions</groupId>
+ <version>${revision}</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <version>1.0.0-SNAPSHOT</version>
+
+ <artifactId>dubbo-metadata-report-etcd</artifactId>
+
+ <properties>
+ <skipIntegrationTests>true</skipIntegrationTests>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-metadata-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo.extensions</groupId>
+ <artifactId>dubbo-remoting-etcd3</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-launcher</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skipTests>${skipIntegrationTests}</skipTests>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/dubbo-metadata-report-extensions/dubbo-metadata-report-etcd/src/main/java/org/apache/dubbo/metadata/store/etcd/EtcdMetadataReport.java
b/dubbo-metadata-report-extensions/dubbo-metadata-report-etcd/src/main/java/org/apache/dubbo/metadata/store/etcd/EtcdMetadataReport.java
new file mode 100644
index 0000000..7d3eab9
--- /dev/null
+++
b/dubbo-metadata-report-extensions/dubbo-metadata-report-etcd/src/main/java/org/apache/dubbo/metadata/store/etcd/EtcdMetadataReport.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.metadata.store.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.metadata.report.identifier.BaseMetadataIdentifier;
+import org.apache.dubbo.metadata.report.identifier.KeyTypeEnum;
+import org.apache.dubbo.metadata.report.identifier.MetadataIdentifier;
+import org.apache.dubbo.metadata.report.identifier.ServiceMetadataIdentifier;
+import
org.apache.dubbo.metadata.report.identifier.SubscriberMetadataIdentifier;
+import org.apache.dubbo.metadata.report.support.AbstractMetadataReport;
+import org.apache.dubbo.remoting.etcd.jetcd.JEtcdClient;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
+
+/**
+ * Report Metadata to Etcd
+ */
+public class EtcdMetadataReport extends AbstractMetadataReport {
+
+ private final String root;
+
+ /**
+ * The etcd client
+ */
+ private final JEtcdClient etcdClient;
+
+ public EtcdMetadataReport(URL url) {
+ super(url);
+ if (url.isAnyHost()) {
+ throw new IllegalStateException("registry address == null");
+ }
+ String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
+ if (!group.startsWith(PATH_SEPARATOR)) {
+ group = PATH_SEPARATOR + group;
+ }
+ this.root = group;
+ etcdClient = new JEtcdClient(url);
+ }
+
+ @Override
+ protected void doStoreProviderMetadata(MetadataIdentifier
providerMetadataIdentifier, String serviceDefinitions) {
+ storeMetadata(providerMetadataIdentifier, serviceDefinitions);
+ }
+
+ @Override
+ protected void doStoreConsumerMetadata(MetadataIdentifier
consumerMetadataIdentifier, String value) {
+ storeMetadata(consumerMetadataIdentifier, value);
+ }
+
+ @Override
+ protected void doSaveMetadata(ServiceMetadataIdentifier
serviceMetadataIdentifier, URL url) {
+ String key = getNodeKey(serviceMetadataIdentifier);
+ if (!etcdClient.put(key, URL.encode(url.toFullString()))) {
+ logger.error("Failed to put " + serviceMetadataIdentifier + " to
etcd, value: " + url);
+ }
+ }
+
+ @Override
+ protected void doRemoveMetadata(ServiceMetadataIdentifier
serviceMetadataIdentifier) {
+ etcdClient.delete(getNodeKey(serviceMetadataIdentifier));
+ }
+
+ @Override
+ protected List<String> doGetExportedURLs(ServiceMetadataIdentifier
metadataIdentifier) {
+ String content = etcdClient.getKVValue(getNodeKey(metadataIdentifier));
+ if (StringUtils.isEmpty(content)) {
+ return Collections.emptyList();
+ }
+ return new ArrayList<String>(Arrays.asList(URL.decode(content)));
+ }
+
+ @Override
+ protected void doSaveSubscriberData(SubscriberMetadataIdentifier
subscriberMetadataIdentifier, String urlListStr) {
+ String key = getNodeKey(subscriberMetadataIdentifier);
+ if (!etcdClient.put(key, urlListStr)) {
+ logger.error("Failed to put " + subscriberMetadataIdentifier + "
to etcd, value: " + urlListStr);
+ }
+ }
+
+ @Override
+ protected String doGetSubscribedURLs(SubscriberMetadataIdentifier
subscriberMetadataIdentifier) {
+ return etcdClient.getKVValue(getNodeKey(subscriberMetadataIdentifier));
+ }
+
+ @Override
+ public String getServiceDefinition(MetadataIdentifier metadataIdentifier) {
+ return etcdClient.getKVValue(getNodeKey(metadataIdentifier));
+ }
+
+ private void storeMetadata(MetadataIdentifier identifier, String v) {
+ String key = getNodeKey(identifier);
+ if (!etcdClient.put(key, v)) {
+ logger.error("Failed to put " + identifier + " to etcd, value: " +
v);
+ }
+ }
+
+ String getNodeKey(BaseMetadataIdentifier identifier) {
+ return toRootDir() + identifier.getUniqueKey(KeyTypeEnum.PATH);
+ }
+
+ String toRootDir() {
+ if (root.equals(PATH_SEPARATOR)) {
+ return root;
+ }
+ return root + PATH_SEPARATOR;
+ }
+}
diff --git
a/dubbo-metadata-report-extensions/dubbo-metadata-report-etcd/src/main/java/org/apache/dubbo/metadata/store/etcd/EtcdMetadataReportFactory.java
b/dubbo-metadata-report-extensions/dubbo-metadata-report-etcd/src/main/java/org/apache/dubbo/metadata/store/etcd/EtcdMetadataReportFactory.java
new file mode 100644
index 0000000..3bb9e92
--- /dev/null
+++
b/dubbo-metadata-report-extensions/dubbo-metadata-report-etcd/src/main/java/org/apache/dubbo/metadata/store/etcd/EtcdMetadataReportFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.metadata.store.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.metadata.report.MetadataReport;
+import org.apache.dubbo.metadata.report.support.AbstractMetadataReportFactory;
+
+/**
+ * MetadataReportFactory to create an Etcd based {@link MetadataReport}.
+ */
+public class EtcdMetadataReportFactory extends AbstractMetadataReportFactory {
+
+ @Override
+ public MetadataReport createMetadataReport(URL url) {
+ return new EtcdMetadataReport(url);
+ }
+
+}
diff --git
a/dubbo-metadata-report-extensions/dubbo-metadata-report-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.report.MetadataReportFactory
b/dubbo-metadata-report-extensions/dubbo-metadata-report-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.report.MetadataReportFactory
new file mode 100644
index 0000000..9a3c98c
--- /dev/null
+++
b/dubbo-metadata-report-extensions/dubbo-metadata-report-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.report.MetadataReportFactory
@@ -0,0 +1 @@
+etcd=org.apache.dubbo.metadata.store.etcd.EtcdMetadataReportFactory
diff --git
a/dubbo-metadata-report-extensions/dubbo-metadata-report-etcd/src/test/java/org/apache/dubbo/metadata/store/etcd/EtcdMetadata4TstService.java
b/dubbo-metadata-report-extensions/dubbo-metadata-report-etcd/src/test/java/org/apache/dubbo/metadata/store/etcd/EtcdMetadata4TstService.java
new file mode 100644
index 0000000..1de21ce
--- /dev/null
+++
b/dubbo-metadata-report-extensions/dubbo-metadata-report-etcd/src/test/java/org/apache/dubbo/metadata/store/etcd/EtcdMetadata4TstService.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metadata.store.etcd;
+
+/**
+ * Test interface for Etcd metadata report
+ */
+public interface EtcdMetadata4TstService {
+
+ int getCounter();
+
+ void printResult(String var);
+}
diff --git
a/dubbo-metadata-report-extensions/dubbo-metadata-report-etcd/src/test/java/org/apache/dubbo/metadata/store/etcd/EtcdMetadataReportTest.java
b/dubbo-metadata-report-extensions/dubbo-metadata-report-etcd/src/test/java/org/apache/dubbo/metadata/store/etcd/EtcdMetadataReportTest.java
new file mode 100644
index 0000000..119522c
--- /dev/null
+++
b/dubbo-metadata-report-extensions/dubbo-metadata-report-etcd/src/test/java/org/apache/dubbo/metadata/store/etcd/EtcdMetadataReportTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metadata.store.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.metadata.definition.ServiceDefinitionBuilder;
+import org.apache.dubbo.metadata.definition.model.FullServiceDefinition;
+import org.apache.dubbo.metadata.report.identifier.MetadataIdentifier;
+import org.apache.dubbo.metadata.report.identifier.ServiceMetadataIdentifier;
+import
org.apache.dubbo.metadata.report.identifier.SubscriberMetadataIdentifier;
+
+import com.google.gson.Gson;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.kv.GetResponse;
+import io.etcd.jetcd.launcher.EtcdCluster;
+import io.etcd.jetcd.launcher.EtcdClusterFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
+import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE;
+
+/**
+ * Unit test for etcd metadata report
+ */
+@Disabled
+public class EtcdMetadataReportTest {
+
+ private static final String TEST_SERVICE =
"org.apache.dubbo.metadata.store.etcd.EtcdMetadata4TstService";
+
+ private EtcdCluster etcdCluster =
EtcdClusterFactory.buildCluster(getClass().getSimpleName(), 1, false);
+ private Client etcdClientForTest;
+ private EtcdMetadataReport etcdMetadataReport;
+ private URL registryUrl;
+ private EtcdMetadataReportFactory etcdMetadataReportFactory;
+
+ @BeforeEach
+ public void setUp() {
+ etcdCluster.start();
+ etcdClientForTest =
Client.builder().endpoints(etcdCluster.getClientEndpoints()).build();
+ List<URI> clientEndPoints = etcdCluster.getClientEndpoints();
+ this.registryUrl = URL.valueOf("etcd://" +
clientEndPoints.get(0).getHost() + ":" + clientEndPoints.get(0).getPort());
+ etcdMetadataReportFactory = new EtcdMetadataReportFactory();
+ this.etcdMetadataReport = (EtcdMetadataReport)
etcdMetadataReportFactory.createMetadataReport(registryUrl);
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ etcdCluster.close();
+ }
+
+ @Test
+ public void testStoreProvider() throws Exception {
+ String version = "1.0.0";
+ String group = null;
+ String application = "etcd-metdata-report-test";
+
+ String r = etcdMetadataReport.getServiceDefinition(new
MetadataIdentifier(TEST_SERVICE, version, group, "provider", application));
+ Assertions.assertNull(r);
+ MetadataIdentifier providerIdentifier =
+ storeProvider(etcdMetadataReport, TEST_SERVICE, version,
group, application);
+
+ CompletableFuture<GetResponse> response =
etcdClientForTest.getKVClient().get(ByteSequence.from(
+ etcdMetadataReport.getNodeKey(providerIdentifier),
StandardCharsets.UTF_8));
+ String fileContent =
response.get().getKvs().get(0).getValue().toString(StandardCharsets.UTF_8);
+ Assertions.assertNotNull(fileContent);
+
+ Gson gson = new Gson();
+ FullServiceDefinition fullServiceDefinition =
gson.fromJson(fileContent, FullServiceDefinition.class);
+
Assertions.assertEquals(fullServiceDefinition.getParameters().get("paramTest"),
"etcdTest");
+
+ r = etcdMetadataReport.getServiceDefinition(new
MetadataIdentifier(TEST_SERVICE, version, group, "provider", application));
+ Assertions.assertNotNull(r);
+ }
+
+ @Test
+ public void testStoreConsumer() throws Exception {
+ String version = "1.0.0";
+ String group = null;
+ String application = "etc-metadata-report-consumer-test";
+ MetadataIdentifier consumerIdentifier =
storeConsumer(etcdMetadataReport, TEST_SERVICE, version, group, application);
+
+ CompletableFuture<GetResponse> response =
etcdClientForTest.getKVClient().get(ByteSequence.from(
+ etcdMetadataReport.getNodeKey(consumerIdentifier),
StandardCharsets.UTF_8));
+ String fileContent =
response.get().getKvs().get(0).getValue().toString(StandardCharsets.UTF_8);
+ Assertions.assertNotNull(fileContent);
+ Assertions.assertEquals(fileContent,
"{\"paramConsumerTest\":\"etcdConsumer\"}");
+ }
+
+ @Test
+ public void testDoSaveMetadata() throws ExecutionException,
InterruptedException {
+ String version = "1.0.0";
+ String group = null;
+ String application = "etc-metadata-report-consumer-test";
+ String revision = "90980";
+ String protocol = "xxx";
+ URL url = generateURL(TEST_SERVICE, version, group, application);
+ ServiceMetadataIdentifier serviceMetadataIdentifier = new
ServiceMetadataIdentifier(TEST_SERVICE, version,
+ group, "provider", revision, protocol);
+ etcdMetadataReport.doSaveMetadata(serviceMetadataIdentifier, url);
+
+ CompletableFuture<GetResponse> response =
etcdClientForTest.getKVClient().get(ByteSequence.from(
+ etcdMetadataReport.getNodeKey(serviceMetadataIdentifier),
StandardCharsets.UTF_8));
+ String fileContent =
response.get().getKvs().get(0).getValue().toString(StandardCharsets.UTF_8);
+ Assertions.assertNotNull(fileContent);
+
+ Assertions.assertEquals(fileContent, URL.encode(url.toFullString()));
+ }
+
+ @Test
+ public void testDoRemoveMetadata() throws ExecutionException,
InterruptedException {
+ String version = "1.0.0";
+ String group = null;
+ String application = "etc-metadata-report-consumer-test";
+ String revision = "90980";
+ String protocol = "xxx";
+ URL url = generateURL(TEST_SERVICE, version, group, application);
+ ServiceMetadataIdentifier serviceMetadataIdentifier = new
ServiceMetadataIdentifier(TEST_SERVICE, version,
+ group, "provider", revision, protocol);
+ etcdMetadataReport.doSaveMetadata(serviceMetadataIdentifier, url);
+ CompletableFuture<GetResponse> response =
etcdClientForTest.getKVClient().get(ByteSequence.from(
+ etcdMetadataReport.getNodeKey(serviceMetadataIdentifier),
StandardCharsets.UTF_8));
+ String fileContent =
response.get().getKvs().get(0).getValue().toString(StandardCharsets.UTF_8);
+ Assertions.assertNotNull(fileContent);
+
+
+ etcdMetadataReport.doRemoveMetadata(serviceMetadataIdentifier);
+
+ response = etcdClientForTest.getKVClient().get(ByteSequence.from(
+ etcdMetadataReport.getNodeKey(serviceMetadataIdentifier),
StandardCharsets.UTF_8));
+ Assertions.assertTrue(response.get().getKvs().isEmpty());
+ }
+
+ @Test
+ public void testDoGetExportedURLs() throws ExecutionException,
InterruptedException {
+ String version = "1.0.0";
+ String group = null;
+ String application = "etc-metadata-report-consumer-test";
+ String revision = "90980";
+ String protocol = "xxx";
+ URL url = generateURL(TEST_SERVICE, version, group, application);
+ ServiceMetadataIdentifier serviceMetadataIdentifier = new
ServiceMetadataIdentifier(TEST_SERVICE, version,
+ group, "provider", revision, protocol);
+ etcdMetadataReport.doSaveMetadata(serviceMetadataIdentifier, url);
+
+ List<String> r =
etcdMetadataReport.doGetExportedURLs(serviceMetadataIdentifier);
+ Assertions.assertTrue(r.size() == 1);
+
+ String fileContent = r.get(0);
+ Assertions.assertNotNull(fileContent);
+
+ Assertions.assertEquals(fileContent, url.toFullString());
+ }
+
+ @Test
+ public void testDoSaveSubscriberData() throws ExecutionException,
InterruptedException {
+ String version = "1.0.0";
+ String group = null;
+ String application = "etc-metadata-report-consumer-test";
+ String revision = "90980";
+ String protocol = "xxx";
+ URL url = generateURL(TEST_SERVICE, version, group, application);
+ SubscriberMetadataIdentifier subscriberMetadataIdentifier = new
SubscriberMetadataIdentifier(application, revision);
+ Gson gson = new Gson();
+ String r = gson.toJson(Arrays.asList(url));
+ etcdMetadataReport.doSaveSubscriberData(subscriberMetadataIdentifier,
r);
+
+ CompletableFuture<GetResponse> response =
etcdClientForTest.getKVClient().get(ByteSequence.from(
+ etcdMetadataReport.getNodeKey(subscriberMetadataIdentifier),
StandardCharsets.UTF_8));
+ String fileContent =
response.get().getKvs().get(0).getValue().toString(StandardCharsets.UTF_8);
+ Assertions.assertNotNull(fileContent);
+
+ Assertions.assertEquals(fileContent, r);
+ }
+
+ @Test
+ public void testDoGetSubscribedURLs() throws ExecutionException,
InterruptedException {
+ String version = "1.0.0";
+ String group = null;
+ String application = "etc-metadata-report-consumer-test";
+ String revision = "90980";
+ String protocol = "xxx";
+ URL url = generateURL(TEST_SERVICE, version, group, application);
+ SubscriberMetadataIdentifier subscriberMetadataIdentifier = new
SubscriberMetadataIdentifier(application, revision);
+ Gson gson = new Gson();
+ String r = gson.toJson(Arrays.asList(url));
+ etcdMetadataReport.doSaveSubscriberData(subscriberMetadataIdentifier,
r);
+
+ CompletableFuture<GetResponse> response =
etcdClientForTest.getKVClient().get(ByteSequence.from(
+ etcdMetadataReport.getNodeKey(subscriberMetadataIdentifier),
StandardCharsets.UTF_8));
+ String fileContent =
etcdMetadataReport.doGetSubscribedURLs(subscriberMetadataIdentifier);
+ Assertions.assertNotNull(fileContent);
+
+ Assertions.assertEquals(fileContent, r);
+ }
+
+ private MetadataIdentifier storeProvider(EtcdMetadataReport
etcdMetadataReport, String interfaceName, String version,
+ String group, String application)
+ throws ClassNotFoundException, InterruptedException {
+ URL url = URL.valueOf("xxx://" +
NetUtils.getLocalAddress().getHostName() + ":4444/" + interfaceName +
+ "?paramTest=etcdTest&version=" + version + "&application="
+ + application + (group == null ? "" : "&group=" + group));
+
+ MetadataIdentifier providerMetadataIdentifier =
+ new MetadataIdentifier(interfaceName, version, group,
PROVIDER_SIDE, application);
+ Class interfaceClass = Class.forName(interfaceName);
+ FullServiceDefinition fullServiceDefinition =
+ ServiceDefinitionBuilder.buildFullDefinition(interfaceClass,
url.getParameters());
+
+ etcdMetadataReport.storeProviderMetadata(providerMetadataIdentifier,
fullServiceDefinition);
+ Thread.sleep(1000);
+ return providerMetadataIdentifier;
+ }
+
+ private URL generateURL(String interfaceName, String version, String
group, String application) {
+ URL url = URL.valueOf("xxx://" +
NetUtils.getLocalAddress().getHostName() + ":8989/" + interfaceName +
+ "?paramTest=etcdTest&version=" + version + "&application="
+ + application + (group == null ? "" : "&group=" + group));
+ return url;
+ }
+
+ private MetadataIdentifier storeConsumer(EtcdMetadataReport
etcdMetadataReport, String interfaceName,
+ String version, String group,
String application) throws InterruptedException {
+
+ MetadataIdentifier consumerIdentifier = new
MetadataIdentifier(interfaceName, version, group, CONSUMER_SIDE, application);
+ Map<String, String> tmp = new HashMap<>();
+ tmp.put("paramConsumerTest", "etcdConsumer");
+ etcdMetadataReport.storeConsumerMetadata(consumerIdentifier, tmp);
+ Thread.sleep(1000);
+ return consumerIdentifier;
+ }
+}
diff --git a/dubbo-metadata-report-extensions/pom.xml
b/dubbo-metadata-report-extensions/pom.xml
index 0a69ed8..bdce3c7 100644
--- a/dubbo-metadata-report-extensions/pom.xml
+++ b/dubbo-metadata-report-extensions/pom.xml
@@ -28,4 +28,10 @@
<artifactId>dubbo-metadata-report-extensions</artifactId>
<version>${revision}</version>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>dubbo-metadata-report-consul</module>
+ <module>dubbo-metadata-report-etcd</module>
+ </modules>
</project>